Created
December 14, 2024 09:23
-
-
Save Canx/e8e9b81646db8421657d6aec645e6991 to your computer and use it in GitHub Desktop.
Gemini 2 informative_indicators refactoring
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import logging | |
import pandas as pd | |
import pandas_ta as pta | |
import numpy as np | |
from pandas import DataFrame | |
from typing import Dict | |
log = logging.getLogger(__name__) | |
# Constants for indicator names | |
RSI_3 = "RSI_3" | |
RSI_14 = "RSI_14" | |
RSI_3_CHANGE_PCT = "RSI_3_change_pct" | |
RSI_14_CHANGE_PCT = "RSI_14_change_pct" | |
RSI_3_DIFF = "RSI_3_diff" | |
RSI_14_DIFF = "RSI_14_diff" | |
BBL_20_2 = "BBL_20_2.0" | |
BBM_20_2 = "BBM_20_2.0" | |
BBU_20_2 = "BBU_20_2.0" | |
BBB_20_2 = "BBB_20_2.0" | |
BBP_20_2 = "BBP_20_2.0" | |
MFI_14 = "MFI_14" | |
CMF_20 = "CMF_20" | |
WILLR_14 = "WILLR_14" | |
AROONU_14 = "AROONU_14" | |
AROOND_14 = "AROOND_14" | |
STOCHK_14_3_3 = "STOCHk_14_3_3" | |
STOCHD_14_3_3 = "STOCHd_14_3_3" | |
STOCHRSIK_14_14_3_3 = "STOCHRSIk_14_14_3_3" | |
STOCHRSID_14_14_3_3 = "STOCHRSId_14_14_3_3" | |
STOCHRSIK_14_14_3_3_CHANGE_PCT = "STOCHRSIk_14_14_3_3_change_pct" | |
KST_10_15_20_30_10_10_10_15 = "KST_10_15_20_30_10_10_10_15" | |
KSTs_9 = "KSTs_9" | |
UO_7_14_28 = "UO_7_14_28" | |
UO_7_14_28_CHANGE_PCT = "UO_7_14_28_change_pct" | |
OBV = "OBV" | |
OBV_CHANGE_PCT = "OBV_change_pct" | |
ROC_2 = "ROC_2" | |
ROC_9 = "ROC_9" | |
CCI_20 = "CCI_20" | |
CCI_20_CHANGE_PCT = "CCI_20_change_pct" | |
CHANGE_PCT = "change_pct" | |
TOP_WICK_PCT = "top_wick_pct" | |
BOT_WICK_PCT = "bot_wick_pct" | |
EMA_12 = "EMA_12" | |
EMA_200 = "EMA_200" | |
EMA_3 = "EMA_3" | |
EMA_9 = "EMA_9" | |
EMA_16 = "EMA_16" | |
EMA_20 = "EMA_20" | |
EMA_26 = "EMA_26" | |
EMA_50 = "EMA_50" | |
SMA_16 = "SMA_16" | |
SMA_30 = "SMA_30" | |
WILLR_480 = "WILLR_480" | |
CLOSE_MAX_48 = "close_max_48" | |
NUM_EMPTY_288 = "num_empty_288" | |
class IndicatorCalculator: | |
def __init__(self, dp, config, num_cores_indicators_calc, bt_min_age_days): | |
self.dp = dp | |
self.config = config | |
self.num_cores_indicators_calc = num_cores_indicators_calc | |
self.bt_min_age_days = bt_min_age_days | |
# Configuration of indicator lengths - Make these configurable | |
self.indicator_lengths = { | |
"rsi_short": 3, | |
"rsi_long": 14, | |
"bbands_length": 20, | |
"mfi_length": 14, | |
"cmf_length": 20, | |
"willr_length": 14, | |
"willr_long_length": 480, | |
"aroon_length": 14, | |
"stoch_length": 14, | |
"stochrsi_length": 14, | |
"kst_length": [10,15,20,30,10,10,10,15], | |
"kst_signal_length": 9, | |
"uo_length": [7, 14, 28], | |
"roc_short_length": 2, | |
"roc_long_length": 9, | |
"cci_length": 20, | |
"ema_short": 12, | |
"ema_long": 200, | |
"ema_3": 3, | |
"ema_9": 9, | |
"ema_16": 16, | |
"ema_20": 20, | |
"ema_26": 26, | |
"ema_50": 50, | |
"sma_16": 16, | |
"sma_30": 30, | |
"obv_length": 1, | |
} | |
def _calculate_rsi(self, df: DataFrame) -> DataFrame: | |
"""Calculates RSI and its derivatives.""" | |
try: | |
log.debug(f"Calculating {RSI_3} and {RSI_14} ...") | |
df[RSI_3] = pta.rsi(df["close"], length=self.indicator_lengths["rsi_short"]) | |
df[RSI_14] = pta.rsi(df["close"], length=self.indicator_lengths["rsi_long"]) | |
df[RSI_3_CHANGE_PCT] = (df[RSI_3] - df[RSI_3].shift(1)) / (df[RSI_3].shift(1)) * 100.0 | |
df[RSI_14_CHANGE_PCT] = (df[RSI_14] - df[RSI_14].shift(1)) / (df[RSI_14].shift(1)) * 100.0 | |
df[RSI_3_DIFF] = df[RSI_3] - df[RSI_3].shift(1) | |
df[RSI_14_DIFF] = df[RSI_14] - df[RSI_14].shift(1) | |
except Exception as e: | |
log.error(f"Error calculating RSI: {e}") | |
return df | |
def _calculate_ema(self, df: DataFrame) -> DataFrame: | |
try: | |
log.debug(f"Calculating {EMA_12} and {EMA_200} ...") | |
df[EMA_12] = pta.ema(df["close"], length=self.indicator_lengths["ema_short"]) | |
df[EMA_200] = pta.ema(df["close"], length=self.indicator_lengths["ema_long"], fillna=0.0) | |
except Exception as e: | |
log.error(f"Error calculating EMA: {e}") | |
return df | |
def _calculate_bbands(self, df: DataFrame) -> DataFrame: | |
"""Calculates Bollinger Bands.""" | |
try: | |
log.debug(f"Calculating Bollinger Bands ...") | |
bbands = pta.bbands(df["close"], length=self.indicator_lengths["bbands_length"]) | |
if isinstance(bbands, pd.DataFrame): | |
df[BBL_20_2] = bbands[f"BBL_{self.indicator_lengths['bbands_length']}_2.0"] | |
df[BBM_20_2] = bbands[f"BBM_{self.indicator_lengths['bbands_length']}_2.0"] | |
df[BBU_20_2] = bbands[f"BBU_{self.indicator_lengths['bbands_length']}_2.0"] | |
df[BBB_20_2] = bbands[f"BBB_{self.indicator_lengths['bbands_length']}_2.0"] | |
df[BBP_20_2] = bbands[f"BBP_{self.indicator_lengths['bbands_length']}_2.0"] | |
else: | |
df[BBL_20_2] = np.nan | |
df[BBM_20_2] = np.nan | |
df[BBU_20_2] = np.nan | |
df[BBB_20_2] = np.nan | |
df[BBP_20_2] = np.nan | |
except Exception as e: | |
log.error(f"Error calculating Bollinger Bands: {e}") | |
return df | |
def _calculate_mfi(self, df: DataFrame) -> DataFrame: | |
"""Calculates Money Flow Index.""" | |
try: | |
log.debug(f"Calculating {MFI_14} ...") | |
df[MFI_14] = pta.mfi(df["high"], df["low"], df["close"], df["volume"], length=self.indicator_lengths["mfi_length"]) | |
except Exception as e: | |
log.error(f"Error calculating MFI: {e}") | |
return df | |
def _calculate_cmf(self, df: DataFrame) -> DataFrame: | |
"""Calculates Chaikin Money Flow.""" | |
try: | |
log.debug(f"Calculating {CMF_20} ...") | |
df[CMF_20] = pta.cmf(df["high"], df["low"], df["close"], df["volume"], length=self.indicator_lengths["cmf_length"]) | |
except Exception as e: | |
log.error(f"Error calculating CMF: {e}") | |
return df | |
def _calculate_willr(self, df: DataFrame) -> DataFrame: | |
"""Calculates Williams %R.""" | |
try: | |
log.debug(f"Calculating {WILLR_14} ...") | |
df[WILLR_14] = pta.willr(df["high"], df["low"], df["close"], length=self.indicator_lengths["willr_length"]) | |
if "willr_long_length" in self.indicator_lengths: | |
log.debug(f"Calculating {WILLR_480} ...") | |
df[WILLR_480] = pta.willr(df["high"], df["low"], df["close"], length=self.indicator_lengths["willr_long_length"]) | |
except Exception as e: | |
log.error(f"Error calculating Williams %R: {e}") | |
return df | |
def _calculate_aroon(self, df: DataFrame) -> DataFrame: | |
"""Calculates Aroon.""" | |
try: | |
log.debug(f"Calculating Aroon ...") | |
aroon = pta.aroon(df["high"], df["low"], length=self.indicator_lengths["aroon_length"]) | |
if isinstance(aroon, pd.DataFrame): | |
df[AROONU_14] = aroon[f"AROONU_{self.indicator_lengths['aroon_length']}"] | |
df[AROOND_14] = aroon[f"AROOND_{self.indicator_lengths['aroon_length']}"] | |
else: | |
df[AROONU_14] = np.nan | |
df[AROOND_14] = np.nan | |
except Exception as e: | |
log.error(f"Error calculating Aroon: {e}") | |
return df | |
def _calculate_stochastic(self, df: DataFrame) -> DataFrame: | |
"""Calculates Stochastic.""" | |
try: | |
log.debug(f"Calculating Stochastic ...") | |
stoch = pta.stoch(df["high"], df["low"], df["close"]) | |
if isinstance(stoch, pd.DataFrame): | |
df[STOCHK_14_3_3] = stoch[f"STOCHk_{self.indicator_lengths['stoch_length']}_3_3"] | |
df[STOCHD_14_3_3] = stoch[f"STOCHd_{self.indicator_lengths['stoch_length']}_3_3"] | |
else: | |
df[STOCHK_14_3_3] = np.nan | |
df[STOCHD_14_3_3] = np.nan | |
except Exception as e: | |
log.error(f"Error calculating Stochastic: {e}") | |
df[STOCHK_14_3_3] = np.nan | |
df[STOCHD_14_3_3] = np.nan | |
return df | |
def _calculate_stochrsi(self, df: DataFrame) -> DataFrame: | |
"""Calculates Stochastic RSI.""" | |
try: | |
log.debug(f"Calculating Stochastic RSI...") | |
stochrsi = pta.stochrsi(df["close"], length=self.indicator_lengths["stochrsi_length"]) | |
if isinstance(stochrsi, pd.DataFrame): | |
df[STOCHRSIK_14_14_3_3] = stochrsi[f"STOCHRSIk_{self.indicator_lengths['stochrsi_length']}_{self.indicator_lengths['stochrsi_length']}_3_3"] | |
df[STOCHRSID_14_14_3_3] = stochrsi[f"STOCHRSId_{self.indicator_lengths['stochrsi_length']}_{self.indicator_lengths['stochrsi_length']}_3_3"] | |
df[STOCHRSIK_14_14_3_3_CHANGE_PCT] = ( | |
(df[STOCHRSIK_14_14_3_3] - df[STOCHRSIK_14_14_3_3].shift(1)) | |
/ df[STOCHRSIK_14_14_3_3].shift(1) | |
) * 100.0 | |
else: | |
df[STOCHRSIK_14_14_3_3] = np.nan | |
df[STOCHRSID_14_14_3_3] = np.nan | |
df[STOCHRSIK_14_14_3_3_CHANGE_PCT] = np.nan | |
except Exception as e: | |
log.error(f"Error calculating Stochastic RSI: {e}") | |
df[STOCHRSIK_14_14_3_3] = np.nan | |
df[STOCHRSID_14_14_3_3] = np.nan | |
df[STOCHRSIK_14_14_3_3_CHANGE_PCT] = np.nan | |
return df | |
def _calculate_kst(self, df: DataFrame) -> DataFrame: | |
"""Calculates KST.""" | |
try: | |
log.debug(f"Calculating KST ...") | |
kst = pta.kst(df["close"], *self.indicator_lengths["kst_length"]) | |
if isinstance(kst, pd.DataFrame): | |
df[KST_10_15_20_30_10_10_10_15] = kst[f"KST_{'_'.join(map(str, self.indicator_lengths['kst_length']))}"] | |
df[KSTs_9] = kst[f"KSTs_{self.indicator_lengths['kst_signal_length']}"] | |
else: | |
df[KST_10_15_20_30_10_10_10_15] = np.nan | |
df[KSTs_9] = np.nan | |
except Exception as e: | |
log.error(f"Error calculating KST: {e}") | |
df[KST_10_15_20_30_10_10_10_15] = np.nan | |
df[KSTs_9] = np.nan | |
return df | |
def _calculate_uo(self, df: DataFrame) -> DataFrame: | |
"""Calculates Ultimate Oscillator.""" | |
try: | |
log.debug(f"Calculating UO ...") | |
df[UO_7_14_28] = pta.uo(df["high"], df["low"], df["close"], | |
length1=self.indicator_lengths["uo_length"][0], | |
length2=self.indicator_lengths["uo_length"][1], | |
length3=self.indicator_lengths["uo_length"][2]) | |
df[UO_7_14_28] = df[UO_7_14_28].astype(np.float64).replace(to_replace=[np.nan, None], value=(50.0)) | |
df[UO_7_14_28_CHANGE_PCT] = ( | |
(df[UO_7_14_28] - df[UO_7_14_28].shift(1)) | |
/ abs(df[UO_7_14_28].shift(1)) | |
) * 100.0 | |
except Exception as e: | |
log.error(f"Error calculating UO: {e}") | |
df[UO_7_14_28] = np.nan | |
df[UO_7_14_28_CHANGE_PCT] = np.nan | |
return df | |
def _calculate_obv(self, df: DataFrame) -> DataFrame: | |
"""Calculates On Balance Volume.""" | |
try: | |
log.debug(f"Calculating OBV ...") | |
df[OBV] = pta.obv(df["close"], df["volume"]) | |
df[OBV_CHANGE_PCT] = ((df[OBV] - df[OBV].shift(1)) / abs(df[OBV].shift(1))) * 100.0 | |
except Exception as e: | |
log.error(f"Error calculating OBV: {e}") | |
return df | |
def _calculate_roc(self, df: DataFrame) -> DataFrame: | |
"""Calculates Rate of Change.""" | |
try: | |
log.debug(f"Calculating ROC ...") | |
df[ROC_2] = pta.roc(df["close"], length=self.indicator_lengths["roc_short_length"]) | |
df[ROC_9] = pta.roc(df["close"], length=self.indicator_lengths["roc_long_length"]) | |
except Exception as e: | |
log.error(f"Error calculating ROC: {e}") | |
return df | |
def _calculate_cci(self, df: DataFrame) -> DataFrame: | |
"""Calculates Commodity Channel Index.""" | |
try: | |
log.debug(f"Calculating CCI ...") | |
df[CCI_20] = pta.cci(df["high"], df["low"], df["close"], length=self.indicator_lengths["cci_length"]) | |
df[CCI_20] = (df[CCI_20]).astype(np.float64).replace(to_replace=[np.nan, None], value=(0.0)) | |
df[CCI_20_CHANGE_PCT] = ( | |
(df[CCI_20] - df[CCI_20].shift(1)) / abs(df[CCI_20].shift(1)) | |
) * 100.0 | |
except Exception as e: | |
log.error(f"Error calculating CCI: {e}") | |
df[CCI_20] = np.nan | |
df[CCI_20_CHANGE_PCT] = np.nan | |
return df | |
def _calculate_candle_change(self, df: DataFrame) -> DataFrame: | |
"""Calculates candle change and wicks.""" | |
try: | |
log.debug(f"Calculating candle change and wicks...") | |
df[CHANGE_PCT] = (df["close"] - df["open"]) / df["open"] * 100.0 | |
df[TOP_WICK_PCT] = ( | |
(df["high"] - np.maximum(df["open"], df["close"])) | |
/ np.maximum(df["open"], df["close"]) | |
* 100.0 | |
) | |
df[BOT_WICK_PCT] = abs( | |
(df["low"] - np.minimum(df["open"], df["close"])) | |
/ np.minimum(df["open"], df["close"]) | |
* 100.0 | |
) | |
except Exception as e: | |
log.error(f"Error calculating candle changes: {e}") | |
return df | |
def _calculate_sma(self, df: DataFrame) -> DataFrame: | |
"""Calculates Simple Moving Average.""" | |
try: | |
log.debug(f"Calculating SMA...") | |
df[SMA_16] = pta.sma(df["close"], length=self.indicator_lengths["sma_16"]) | |
df[SMA_30] = pta.sma(df["close"], length=self.indicator_lengths["sma_30"]) | |
except Exception as e: | |
log.error(f"Error calculating SMA: {e}") | |
return df | |
def calculate_indicators(self, metadata: Dict, df: DataFrame) -> DataFrame: | |
"""Calculates all informative indicators for a given timeframe.""" | |
tik = time.perf_counter() | |
df = self._calculate_rsi(df) | |
df = self._calculate_ema(df) | |
df = self._calculate_bbands(df) | |
df = self._calculate_mfi(df) | |
df = self._calculate_cmf(df) | |
df = self._calculate_willr(df) | |
df = self._calculate_aroon(df) | |
df = self._calculate_stochastic(df) | |
df = self._calculate_stochrsi(df) | |
df = self._calculate_kst(df) | |
df = self._calculate_uo(df) | |
df = self._calculate_obv(df) | |
df = self._calculate_roc(df) | |
df = self._calculate_cci(df) | |
df = self._calculate_candle_change(df) | |
df = self._calculate_sma(df) | |
tok = time.perf_counter() | |
log.debug(f"[{metadata['pair']}] indicators took: {tok - tik:0.4f} seconds.") | |
return df | |
def informative_indicators(self, metadata: Dict, info_timeframe: str) -> DataFrame: | |
"""Retrieves dataframe, and calculates informative indicators.""" | |
tik = time.perf_counter() | |
assert self.dp, "DataProvider is required for multiple timeframes." | |
informative_df = self.dp.get_pair_dataframe(pair=metadata["pair"], timeframe=info_timeframe) | |
informative_df = self.calculate_indicators(metadata, informative_df) | |
tok = time.perf_counter() | |
log.debug(f"[{metadata['pair']}] informative_indicators took: {tok - tik:0.4f} seconds.") | |
return informative_df | |
def base_tf_5m_indicators(self, metadata: Dict, df: DataFrame) -> DataFrame: | |
"""Calculates base timeframe indicators.""" | |
tik = time.perf_counter() | |
df = self.calculate_indicators(metadata, df) | |
# Close max | |
df[CLOSE_MAX_48] = df["close"].rolling(48).max() | |
# Number of empty candles | |
df[NUM_EMPTY_288] = (df["volume"] <= 0).rolling(window=288, min_periods=288).sum() | |
# ----------------------------------------------------------------------------------------- | |
# Global protections | |
# ----------------------------------------------------------------------------------------- | |
if not self.config["runmode"].value in ("live", "dry_run"): | |
# Backtest age filter | |
df["bt_agefilter_ok"] = False | |
df.loc[df.index > (12 * 24 * self.bt_min_age_days), "bt_agefilter_ok"] = True | |
else: | |
# Exchange downtime protection | |
df["live_data_ok"] = df["volume"].rolling(window=72, min_periods=72).min() > 0 | |
# Performance logging | |
# ----------------------------------------------------------------------------------------- | |
tok = time.perf_counter() | |
log.debug(f"[{metadata['pair']}] base_tf_5m_indicators took: {tok - tik:0.4f} seconds.") | |
return df | |
def info_switcher(self, metadata: Dict, info_timeframe: str) -> DataFrame: | |
"""Routes to the correct informative indicator function.""" | |
if info_timeframe in ("1d","4h", "1h", "15m"): | |
return self.informative_indicators(metadata, info_timeframe) | |
else: | |
raise RuntimeError(f"{info_timeframe} not supported as informative timeframe for BTC pair.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment