Skip to content

Instantly share code, notes, and snippets.

@Canx
Created December 14, 2024 09:23
Show Gist options
  • Save Canx/e8e9b81646db8421657d6aec645e6991 to your computer and use it in GitHub Desktop.
Save Canx/e8e9b81646db8421657d6aec645e6991 to your computer and use it in GitHub Desktop.
Gemini 2 informative_indicators refactoring
import time
import logging
import pandas as pd
import pandas_ta as pta
import numpy as np
from pandas import DataFrame
from typing import Dict
log = logging.getLogger(__name__)
# Constants for indicator names
RSI_3 = "RSI_3"
RSI_14 = "RSI_14"
RSI_3_CHANGE_PCT = "RSI_3_change_pct"
RSI_14_CHANGE_PCT = "RSI_14_change_pct"
RSI_3_DIFF = "RSI_3_diff"
RSI_14_DIFF = "RSI_14_diff"
BBL_20_2 = "BBL_20_2.0"
BBM_20_2 = "BBM_20_2.0"
BBU_20_2 = "BBU_20_2.0"
BBB_20_2 = "BBB_20_2.0"
BBP_20_2 = "BBP_20_2.0"
MFI_14 = "MFI_14"
CMF_20 = "CMF_20"
WILLR_14 = "WILLR_14"
AROONU_14 = "AROONU_14"
AROOND_14 = "AROOND_14"
STOCHK_14_3_3 = "STOCHk_14_3_3"
STOCHD_14_3_3 = "STOCHd_14_3_3"
STOCHRSIK_14_14_3_3 = "STOCHRSIk_14_14_3_3"
STOCHRSID_14_14_3_3 = "STOCHRSId_14_14_3_3"
STOCHRSIK_14_14_3_3_CHANGE_PCT = "STOCHRSIk_14_14_3_3_change_pct"
KST_10_15_20_30_10_10_10_15 = "KST_10_15_20_30_10_10_10_15"
KSTs_9 = "KSTs_9"
UO_7_14_28 = "UO_7_14_28"
UO_7_14_28_CHANGE_PCT = "UO_7_14_28_change_pct"
OBV = "OBV"
OBV_CHANGE_PCT = "OBV_change_pct"
ROC_2 = "ROC_2"
ROC_9 = "ROC_9"
CCI_20 = "CCI_20"
CCI_20_CHANGE_PCT = "CCI_20_change_pct"
CHANGE_PCT = "change_pct"
TOP_WICK_PCT = "top_wick_pct"
BOT_WICK_PCT = "bot_wick_pct"
EMA_12 = "EMA_12"
EMA_200 = "EMA_200"
EMA_3 = "EMA_3"
EMA_9 = "EMA_9"
EMA_16 = "EMA_16"
EMA_20 = "EMA_20"
EMA_26 = "EMA_26"
EMA_50 = "EMA_50"
SMA_16 = "SMA_16"
SMA_30 = "SMA_30"
WILLR_480 = "WILLR_480"
CLOSE_MAX_48 = "close_max_48"
NUM_EMPTY_288 = "num_empty_288"
class IndicatorCalculator:
def __init__(self, dp, config, num_cores_indicators_calc, bt_min_age_days):
self.dp = dp
self.config = config
self.num_cores_indicators_calc = num_cores_indicators_calc
self.bt_min_age_days = bt_min_age_days
# Configuration of indicator lengths - Make these configurable
self.indicator_lengths = {
"rsi_short": 3,
"rsi_long": 14,
"bbands_length": 20,
"mfi_length": 14,
"cmf_length": 20,
"willr_length": 14,
"willr_long_length": 480,
"aroon_length": 14,
"stoch_length": 14,
"stochrsi_length": 14,
"kst_length": [10,15,20,30,10,10,10,15],
"kst_signal_length": 9,
"uo_length": [7, 14, 28],
"roc_short_length": 2,
"roc_long_length": 9,
"cci_length": 20,
"ema_short": 12,
"ema_long": 200,
"ema_3": 3,
"ema_9": 9,
"ema_16": 16,
"ema_20": 20,
"ema_26": 26,
"ema_50": 50,
"sma_16": 16,
"sma_30": 30,
"obv_length": 1,
}
def _calculate_rsi(self, df: DataFrame) -> DataFrame:
"""Calculates RSI and its derivatives."""
try:
log.debug(f"Calculating {RSI_3} and {RSI_14} ...")
df[RSI_3] = pta.rsi(df["close"], length=self.indicator_lengths["rsi_short"])
df[RSI_14] = pta.rsi(df["close"], length=self.indicator_lengths["rsi_long"])
df[RSI_3_CHANGE_PCT] = (df[RSI_3] - df[RSI_3].shift(1)) / (df[RSI_3].shift(1)) * 100.0
df[RSI_14_CHANGE_PCT] = (df[RSI_14] - df[RSI_14].shift(1)) / (df[RSI_14].shift(1)) * 100.0
df[RSI_3_DIFF] = df[RSI_3] - df[RSI_3].shift(1)
df[RSI_14_DIFF] = df[RSI_14] - df[RSI_14].shift(1)
except Exception as e:
log.error(f"Error calculating RSI: {e}")
return df
def _calculate_ema(self, df: DataFrame) -> DataFrame:
try:
log.debug(f"Calculating {EMA_12} and {EMA_200} ...")
df[EMA_12] = pta.ema(df["close"], length=self.indicator_lengths["ema_short"])
df[EMA_200] = pta.ema(df["close"], length=self.indicator_lengths["ema_long"], fillna=0.0)
except Exception as e:
log.error(f"Error calculating EMA: {e}")
return df
def _calculate_bbands(self, df: DataFrame) -> DataFrame:
"""Calculates Bollinger Bands."""
try:
log.debug(f"Calculating Bollinger Bands ...")
bbands = pta.bbands(df["close"], length=self.indicator_lengths["bbands_length"])
if isinstance(bbands, pd.DataFrame):
df[BBL_20_2] = bbands[f"BBL_{self.indicator_lengths['bbands_length']}_2.0"]
df[BBM_20_2] = bbands[f"BBM_{self.indicator_lengths['bbands_length']}_2.0"]
df[BBU_20_2] = bbands[f"BBU_{self.indicator_lengths['bbands_length']}_2.0"]
df[BBB_20_2] = bbands[f"BBB_{self.indicator_lengths['bbands_length']}_2.0"]
df[BBP_20_2] = bbands[f"BBP_{self.indicator_lengths['bbands_length']}_2.0"]
else:
df[BBL_20_2] = np.nan
df[BBM_20_2] = np.nan
df[BBU_20_2] = np.nan
df[BBB_20_2] = np.nan
df[BBP_20_2] = np.nan
except Exception as e:
log.error(f"Error calculating Bollinger Bands: {e}")
return df
def _calculate_mfi(self, df: DataFrame) -> DataFrame:
"""Calculates Money Flow Index."""
try:
log.debug(f"Calculating {MFI_14} ...")
df[MFI_14] = pta.mfi(df["high"], df["low"], df["close"], df["volume"], length=self.indicator_lengths["mfi_length"])
except Exception as e:
log.error(f"Error calculating MFI: {e}")
return df
def _calculate_cmf(self, df: DataFrame) -> DataFrame:
"""Calculates Chaikin Money Flow."""
try:
log.debug(f"Calculating {CMF_20} ...")
df[CMF_20] = pta.cmf(df["high"], df["low"], df["close"], df["volume"], length=self.indicator_lengths["cmf_length"])
except Exception as e:
log.error(f"Error calculating CMF: {e}")
return df
def _calculate_willr(self, df: DataFrame) -> DataFrame:
"""Calculates Williams %R."""
try:
log.debug(f"Calculating {WILLR_14} ...")
df[WILLR_14] = pta.willr(df["high"], df["low"], df["close"], length=self.indicator_lengths["willr_length"])
if "willr_long_length" in self.indicator_lengths:
log.debug(f"Calculating {WILLR_480} ...")
df[WILLR_480] = pta.willr(df["high"], df["low"], df["close"], length=self.indicator_lengths["willr_long_length"])
except Exception as e:
log.error(f"Error calculating Williams %R: {e}")
return df
def _calculate_aroon(self, df: DataFrame) -> DataFrame:
"""Calculates Aroon."""
try:
log.debug(f"Calculating Aroon ...")
aroon = pta.aroon(df["high"], df["low"], length=self.indicator_lengths["aroon_length"])
if isinstance(aroon, pd.DataFrame):
df[AROONU_14] = aroon[f"AROONU_{self.indicator_lengths['aroon_length']}"]
df[AROOND_14] = aroon[f"AROOND_{self.indicator_lengths['aroon_length']}"]
else:
df[AROONU_14] = np.nan
df[AROOND_14] = np.nan
except Exception as e:
log.error(f"Error calculating Aroon: {e}")
return df
def _calculate_stochastic(self, df: DataFrame) -> DataFrame:
"""Calculates Stochastic."""
try:
log.debug(f"Calculating Stochastic ...")
stoch = pta.stoch(df["high"], df["low"], df["close"])
if isinstance(stoch, pd.DataFrame):
df[STOCHK_14_3_3] = stoch[f"STOCHk_{self.indicator_lengths['stoch_length']}_3_3"]
df[STOCHD_14_3_3] = stoch[f"STOCHd_{self.indicator_lengths['stoch_length']}_3_3"]
else:
df[STOCHK_14_3_3] = np.nan
df[STOCHD_14_3_3] = np.nan
except Exception as e:
log.error(f"Error calculating Stochastic: {e}")
df[STOCHK_14_3_3] = np.nan
df[STOCHD_14_3_3] = np.nan
return df
def _calculate_stochrsi(self, df: DataFrame) -> DataFrame:
"""Calculates Stochastic RSI."""
try:
log.debug(f"Calculating Stochastic RSI...")
stochrsi = pta.stochrsi(df["close"], length=self.indicator_lengths["stochrsi_length"])
if isinstance(stochrsi, pd.DataFrame):
df[STOCHRSIK_14_14_3_3] = stochrsi[f"STOCHRSIk_{self.indicator_lengths['stochrsi_length']}_{self.indicator_lengths['stochrsi_length']}_3_3"]
df[STOCHRSID_14_14_3_3] = stochrsi[f"STOCHRSId_{self.indicator_lengths['stochrsi_length']}_{self.indicator_lengths['stochrsi_length']}_3_3"]
df[STOCHRSIK_14_14_3_3_CHANGE_PCT] = (
(df[STOCHRSIK_14_14_3_3] - df[STOCHRSIK_14_14_3_3].shift(1))
/ df[STOCHRSIK_14_14_3_3].shift(1)
) * 100.0
else:
df[STOCHRSIK_14_14_3_3] = np.nan
df[STOCHRSID_14_14_3_3] = np.nan
df[STOCHRSIK_14_14_3_3_CHANGE_PCT] = np.nan
except Exception as e:
log.error(f"Error calculating Stochastic RSI: {e}")
df[STOCHRSIK_14_14_3_3] = np.nan
df[STOCHRSID_14_14_3_3] = np.nan
df[STOCHRSIK_14_14_3_3_CHANGE_PCT] = np.nan
return df
def _calculate_kst(self, df: DataFrame) -> DataFrame:
"""Calculates KST."""
try:
log.debug(f"Calculating KST ...")
kst = pta.kst(df["close"], *self.indicator_lengths["kst_length"])
if isinstance(kst, pd.DataFrame):
df[KST_10_15_20_30_10_10_10_15] = kst[f"KST_{'_'.join(map(str, self.indicator_lengths['kst_length']))}"]
df[KSTs_9] = kst[f"KSTs_{self.indicator_lengths['kst_signal_length']}"]
else:
df[KST_10_15_20_30_10_10_10_15] = np.nan
df[KSTs_9] = np.nan
except Exception as e:
log.error(f"Error calculating KST: {e}")
df[KST_10_15_20_30_10_10_10_15] = np.nan
df[KSTs_9] = np.nan
return df
def _calculate_uo(self, df: DataFrame) -> DataFrame:
"""Calculates Ultimate Oscillator."""
try:
log.debug(f"Calculating UO ...")
df[UO_7_14_28] = pta.uo(df["high"], df["low"], df["close"],
length1=self.indicator_lengths["uo_length"][0],
length2=self.indicator_lengths["uo_length"][1],
length3=self.indicator_lengths["uo_length"][2])
df[UO_7_14_28] = df[UO_7_14_28].astype(np.float64).replace(to_replace=[np.nan, None], value=(50.0))
df[UO_7_14_28_CHANGE_PCT] = (
(df[UO_7_14_28] - df[UO_7_14_28].shift(1))
/ abs(df[UO_7_14_28].shift(1))
) * 100.0
except Exception as e:
log.error(f"Error calculating UO: {e}")
df[UO_7_14_28] = np.nan
df[UO_7_14_28_CHANGE_PCT] = np.nan
return df
def _calculate_obv(self, df: DataFrame) -> DataFrame:
"""Calculates On Balance Volume."""
try:
log.debug(f"Calculating OBV ...")
df[OBV] = pta.obv(df["close"], df["volume"])
df[OBV_CHANGE_PCT] = ((df[OBV] - df[OBV].shift(1)) / abs(df[OBV].shift(1))) * 100.0
except Exception as e:
log.error(f"Error calculating OBV: {e}")
return df
def _calculate_roc(self, df: DataFrame) -> DataFrame:
"""Calculates Rate of Change."""
try:
log.debug(f"Calculating ROC ...")
df[ROC_2] = pta.roc(df["close"], length=self.indicator_lengths["roc_short_length"])
df[ROC_9] = pta.roc(df["close"], length=self.indicator_lengths["roc_long_length"])
except Exception as e:
log.error(f"Error calculating ROC: {e}")
return df
def _calculate_cci(self, df: DataFrame) -> DataFrame:
"""Calculates Commodity Channel Index."""
try:
log.debug(f"Calculating CCI ...")
df[CCI_20] = pta.cci(df["high"], df["low"], df["close"], length=self.indicator_lengths["cci_length"])
df[CCI_20] = (df[CCI_20]).astype(np.float64).replace(to_replace=[np.nan, None], value=(0.0))
df[CCI_20_CHANGE_PCT] = (
(df[CCI_20] - df[CCI_20].shift(1)) / abs(df[CCI_20].shift(1))
) * 100.0
except Exception as e:
log.error(f"Error calculating CCI: {e}")
df[CCI_20] = np.nan
df[CCI_20_CHANGE_PCT] = np.nan
return df
def _calculate_candle_change(self, df: DataFrame) -> DataFrame:
"""Calculates candle change and wicks."""
try:
log.debug(f"Calculating candle change and wicks...")
df[CHANGE_PCT] = (df["close"] - df["open"]) / df["open"] * 100.0
df[TOP_WICK_PCT] = (
(df["high"] - np.maximum(df["open"], df["close"]))
/ np.maximum(df["open"], df["close"])
* 100.0
)
df[BOT_WICK_PCT] = abs(
(df["low"] - np.minimum(df["open"], df["close"]))
/ np.minimum(df["open"], df["close"])
* 100.0
)
except Exception as e:
log.error(f"Error calculating candle changes: {e}")
return df
def _calculate_sma(self, df: DataFrame) -> DataFrame:
"""Calculates Simple Moving Average."""
try:
log.debug(f"Calculating SMA...")
df[SMA_16] = pta.sma(df["close"], length=self.indicator_lengths["sma_16"])
df[SMA_30] = pta.sma(df["close"], length=self.indicator_lengths["sma_30"])
except Exception as e:
log.error(f"Error calculating SMA: {e}")
return df
def calculate_indicators(self, metadata: Dict, df: DataFrame) -> DataFrame:
"""Calculates all informative indicators for a given timeframe."""
tik = time.perf_counter()
df = self._calculate_rsi(df)
df = self._calculate_ema(df)
df = self._calculate_bbands(df)
df = self._calculate_mfi(df)
df = self._calculate_cmf(df)
df = self._calculate_willr(df)
df = self._calculate_aroon(df)
df = self._calculate_stochastic(df)
df = self._calculate_stochrsi(df)
df = self._calculate_kst(df)
df = self._calculate_uo(df)
df = self._calculate_obv(df)
df = self._calculate_roc(df)
df = self._calculate_cci(df)
df = self._calculate_candle_change(df)
df = self._calculate_sma(df)
tok = time.perf_counter()
log.debug(f"[{metadata['pair']}] indicators took: {tok - tik:0.4f} seconds.")
return df
def informative_indicators(self, metadata: Dict, info_timeframe: str) -> DataFrame:
"""Retrieves dataframe, and calculates informative indicators."""
tik = time.perf_counter()
assert self.dp, "DataProvider is required for multiple timeframes."
informative_df = self.dp.get_pair_dataframe(pair=metadata["pair"], timeframe=info_timeframe)
informative_df = self.calculate_indicators(metadata, informative_df)
tok = time.perf_counter()
log.debug(f"[{metadata['pair']}] informative_indicators took: {tok - tik:0.4f} seconds.")
return informative_df
def base_tf_5m_indicators(self, metadata: Dict, df: DataFrame) -> DataFrame:
"""Calculates base timeframe indicators."""
tik = time.perf_counter()
df = self.calculate_indicators(metadata, df)
# Close max
df[CLOSE_MAX_48] = df["close"].rolling(48).max()
# Number of empty candles
df[NUM_EMPTY_288] = (df["volume"] <= 0).rolling(window=288, min_periods=288).sum()
# -----------------------------------------------------------------------------------------
# Global protections
# -----------------------------------------------------------------------------------------
if not self.config["runmode"].value in ("live", "dry_run"):
# Backtest age filter
df["bt_agefilter_ok"] = False
df.loc[df.index > (12 * 24 * self.bt_min_age_days), "bt_agefilter_ok"] = True
else:
# Exchange downtime protection
df["live_data_ok"] = df["volume"].rolling(window=72, min_periods=72).min() > 0
# Performance logging
# -----------------------------------------------------------------------------------------
tok = time.perf_counter()
log.debug(f"[{metadata['pair']}] base_tf_5m_indicators took: {tok - tik:0.4f} seconds.")
return df
def info_switcher(self, metadata: Dict, info_timeframe: str) -> DataFrame:
"""Routes to the correct informative indicator function."""
if info_timeframe in ("1d","4h", "1h", "15m"):
return self.informative_indicators(metadata, info_timeframe)
else:
raise RuntimeError(f"{info_timeframe} not supported as informative timeframe for BTC pair.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment