Last active
May 5, 2024 04:37
-
-
Save normanlmfung/adc0e396443f1a589937814a2aa973e4 to your computer and use it in GitHub Desktop.
ts_slicer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import json | |
from datetime import datetime | |
import logging | |
import arrow | |
from typing import Dict, List | |
import enum | |
import math | |
import pandas as pd | |
import statsmodels.api as sm # in-compatible with pypy | |
from hurst import compute_Hc # compatible with pypy | |
from scipy.stats import linregress | |
import numpy as np | |
import seaborn as sns | |
import matplotlib.pyplot as plt | |
import matplotlib.dates as mdates | |
from ccxt.binance import binance | |
from ccxt.okex5 import okex5 | |
from ccxt.bybit import bybit | |
# For performance profiling | |
import cProfile | |
from pstats import Stats, SortKey | |
import random | |
import hashlib | |
import time | |
sys.path.append('./gizmo') | |
from market_data_gizmo import fetch_candles | |
base_dir : str = f"{os.path.dirname(sys.path[0])}\\ts_slicer" | |
REPORT_NAME : str = "ts_slicer" | |
CACHE_CANDLES : str = f"{os.path.dirname(sys.path[0])}\\casin0_bt\\cache\\research\\candles" | |
num_candles_limit : int = 100 # Depends on exchange but generally 100 ok! | |
param = { | |
'apiKey' : None, | |
'secret' : None, | |
'password' : None, # Other exchanges dont require this! This is saved in exchange.password! | |
'subaccount' : None, | |
'rateLimit' : 100, # In ms | |
'options' : { | |
'defaultType': 'swap', # Should test linear instead | |
'leg_room_bps' : 5, | |
'trade_fee_bps' : 3 | |
} | |
} | |
ticker : str = 'BTC/USDT:USDT' | |
dt_start : datetime = datetime(2024,1,1) | |
dt_end : datetime = datetime(2024,4,11) | |
ts_candle_size : str = '1h' | |
''' | |
Length of sliding window, relative to overall window size between dt_start and dt_end. | |
Example, dt_start = 20200101, dt_end = 20240101. So total 4 yr. | |
If ts_how_many_candles = 4 yr (1460d), sliding_window_how_many_candles = 365 * 3/12 = 91.25 days: sliding_window_ratio = 1460/91.25 = 16 | |
So, '16' is my fav. | |
''' | |
sliding_window_ratio = 16 | |
ts_how_many_candles : int = 0 | |
total_seconds = (dt_end - dt_start).total_seconds() | |
total_hours = total_seconds / 3600 | |
total_days = total_hours / 24 | |
sliding_window_how_many_candles : int = 0 | |
if ts_candle_size == '1h': | |
sliding_window_how_many_candles = int(total_hours / sliding_window_ratio) | |
elif ts_candle_size == '1d': | |
sliding_window_how_many_candles = int(total_days / sliding_window_ratio) | |
smoothing_window_size_ratio : int = 3 | |
boillenger_std_multiples : float = 1 | |
linregress_stderr_threshold : float = 10 | |
max_recur_depth : int = 2 | |
min_segment_size_how_many_candles : int = 15 | |
segment_consolidate_slope_ratio_threshold : float = 2 | |
sideway_price_condition_threshold : float = 0.05 # i.e. Price if stay within 5% between start and close it's considered 'Sideway' market. | |
# atm, only support one single exchange | |
target_exchange = okex5(param) | |
target_exchange.name='okx_linear' | |
full_report_name : str = f"{REPORT_NAME}_" | |
logging.Formatter.converter = time.gmtime | |
logger = logging.getLogger(REPORT_NAME) | |
log_level = logging.INFO # DEBUG --> INFO --> WARNING --> ERROR | |
logger.setLevel(log_level) | |
format_str = '%(asctime)s %(message)s' | |
formatter = logging.Formatter(format_str) | |
sh = logging.StreamHandler() | |
sh.setLevel(log_level) | |
sh.setFormatter(formatter) | |
logger.addHandler(sh) | |
fh = logging.FileHandler(f"{full_report_name}.log") | |
fh.setLevel(log_level) | |
fh.setFormatter(formatter) | |
logger.addHandler(fh) | |
def log(message : str): | |
if not isinstance(message, Dict): | |
logger.info(message) | |
else: | |
logger.info(json.dumps(message, indent=4)) | |
''' | |
The implementation from Geeksforgeeks https://www.geeksforgeeks.org/find-indices-of-all-local-maxima-and-local-minima-in-an-array/ is wrong. | |
If you have consecutive-duplicates, things will gall apart! | |
Example 1: values = [ 1, 2, 3, 7, 11, 15, 13, 12, 11, 6, 5, 7, 11, 8] | |
The default implementation correctly identify "15" as a peak. | |
Example 2: values = [ 1, 2, 3, 7, 11, 15, 15, 13, 12, 11, 6, 5, 7, 11, 8] | |
The default implementation will mark "11" as local maxima because there are two consecutive 15's. | |
Fix: https://stackoverflow.com/questions/75013708/python-finding-local-minima-and-maxima?noredirect=1#comment132376733_75013708 | |
''' | |
def find_local_max_min(values: List[float], merge_distance: int = 5): | |
mx = [] | |
mn = [] | |
n = len(values) | |
if n < 2: | |
return None | |
if values[0] > values[1]: | |
mn.append(0) | |
elif values[0] < values[1]: | |
mx.append(0) | |
for i in range(1, n-1): | |
if all(values[i] >= values[j] for j in range(i-10, i+11) if 0 <= j < n): | |
mx.append(i) | |
elif all(values[i] <= values[j] for j in range(i-10, i+11) if 0 <= j < n): | |
mn.append(i) | |
if values[-1] > values[-2]: | |
mx.append(n-1) | |
elif values[-1] < values[-2]: | |
mn.append(n-1) | |
# Merge nearby maxima and minima | |
mx_merged = [] | |
mn_merged = [] | |
def merge_nearby_points(points): | |
merged = [] | |
start = points[0] | |
for i in range(1, len(points)): | |
if points[i] - start > merge_distance: | |
merged.append(start + (points[i-1] - start) // 2) # Take the middle point | |
start = points[i] | |
merged.append(start + (points[-1] - start) // 2) # Take the middle point for the last segment | |
return merged | |
mx_merged = merge_nearby_points(mx) | |
mn_merged = merge_nearby_points(mn) | |
return { | |
'local_max': mx_merged, | |
'local_min': mn_merged | |
} | |
def fix_column_types(pd_candles : pd.DataFrame): | |
pd_candles['open'] = pd_candles['open'].astype(float) | |
pd_candles['high'] = pd_candles['high'].astype(float) | |
pd_candles['low'] = pd_candles['low'].astype(float) | |
pd_candles['close'] = pd_candles['close'].astype(float) | |
pd_candles['volume'] = pd_candles['volume'].astype(float) | |
pd_candles['datetime'] = pd.to_datetime(pd_candles['datetime']) | |
# This is to make it easy to do grouping with Excel pivot table | |
pd_candles['year'] = pd_candles['datetime'].dt.year | |
pd_candles['month'] = pd_candles['datetime'].dt.month | |
pd_candles['day'] = pd_candles['datetime'].dt.day | |
pd_candles['hour'] = pd_candles['datetime'].dt.hour | |
pd_candles['minute'] = pd_candles['datetime'].dt.minute | |
# Fibonacci | |
MAGIC_FIB_LEVELS = [0, 0.236, 0.382, 0.5, 0.618, 0.786, 1.00, 1.618, 2.618, 3.618, 4.236] | |
def estimate_fib_retracement( | |
swing_low: float, | |
swing_low_idx: int, | |
swing_high: float, | |
swing_high_idx: int, | |
target_fib_level: float = 0.618 | |
): | |
price_range = swing_high - swing_low | |
# https://blog.quantinsti.com/fibonacci-retracement-trading-strategy-python/ | |
if swing_low_idx < swing_high_idx: | |
retracement_price = swing_high - (price_range * target_fib_level) | |
else: | |
retracement_price = swing_low + (price_range * target_fib_level) | |
return retracement_price | |
''' | |
compute_candles_stats will calculate typical/basic technical indicators using in many trading strategies: | |
a. Basic SMA/EMAs (And slopes) | |
b. ATR | |
c. Hurst Exponent | |
d. RSI | |
e. MACD | |
f. Fibonacci | |
g. Inflections points: where 'close' crosses EMA from above or below. | |
Parameters: | |
a. boillenger_std_multiples: For boillenger upper and lower calc | |
b. sliding_window_how_many_candles: Moving averages calculation | |
c. rsi_ema: RSI calculated using EMA or SMA? | |
d. boillenger_ema: Boillenger calculated using SMA or EMA? | |
e. slow_fast_interval_ratios | |
MACD calculated using two moving averages. | |
Slow line using 'sliding_window_how_many_candles' intervals. | |
Fast line using 'sliding_window_how_many_candles/slow_fast_interval_ratios' intervals. | |
Example, | |
if Slow line is calculated using 24 candles and short_long_interval_ratios = 3, | |
then Fast line is calculated using 24/3 = 8 candles. | |
''' | |
def compute_candles_stats( | |
pd_candles : pd.DataFrame, | |
boillenger_std_multiples : float, | |
sliding_window_how_many_candles : int, | |
rsi_ema : bool = True, | |
boillenger_ema : bool = False, | |
slow_fast_interval_ratio : float = 3 | |
): | |
fix_column_types(pd_candles) | |
pd_candles['candle_height'] = pd_candles['high'] - pd_candles['low'] | |
''' | |
market_data_gizmo inserted dummy lines --> Need exclude those or "TypeError: unorderable types for comparison": pd_btc_candles = pd_btc_candles[pd_btc_candles.close.notnull()] | |
pd_btc_candles.loc[ | |
(pd_btc_candles['close_above_or_below_ema'] != pd_btc_candles['close_above_or_below_ema'].shift(1)) & | |
(abs(pd_btc_candles['gap_close_vs_ema']) > avg_gap_close_vs_ema), | |
'close_vs_ema_inflection' | |
] = np.sign(pd_btc_candles['close'] - pd_btc_candles['ema_long_periods']) <-- TypeError: unorderable types for comparison | |
''' | |
# pd_candles = pd_candles[pd_candles.close.notnull()] # Don't make a copy. Drop in-place | |
pd_candles.drop(pd_candles[pd_candles.close.isnull()].index, inplace=True) | |
pd_candles['pct_change_close'] = pd_candles['close'].pct_change() * 100 | |
pd_candles['sma_short_periods'] = pd_candles['close'].rolling(window=int(sliding_window_how_many_candles/slow_fast_interval_ratio)).mean() | |
pd_candles['sma_long_periods'] = pd_candles['close'].rolling(window=sliding_window_how_many_candles).mean() | |
pd_candles['ema_short_periods'] = pd_candles['close'].ewm(span=int(sliding_window_how_many_candles/slow_fast_interval_ratio), adjust=False).mean() | |
pd_candles['ema_long_periods'] = pd_candles['close'].ewm(span=sliding_window_how_many_candles, adjust=False).mean() | |
pd_candles['ema_close'] = pd_candles['ema_long_periods'] # Alias, shorter name | |
pd_candles['std'] = pd_candles['close'].rolling(window=sliding_window_how_many_candles).std() | |
pd_candles['max_short_periods'] = pd_candles['close'].rolling(window=int(sliding_window_how_many_candles/slow_fast_interval_ratio)).max() | |
pd_candles['max_long_periods'] = pd_candles['close'].rolling(window=sliding_window_how_many_candles).max() | |
pd_candles['idmax_short_periods'] = pd_candles['close'].rolling(window=int(sliding_window_how_many_candles/slow_fast_interval_ratio)).apply(lambda x : x.idxmax()) | |
pd_candles['idmax_long_periods'] = pd_candles['close'].rolling(window=sliding_window_how_many_candles).apply(lambda x : x.idxmax()) | |
pd_candles['min_short_periods'] = pd_candles['close'].rolling(window=int(sliding_window_how_many_candles/slow_fast_interval_ratio)).min() | |
pd_candles['min_long_periods'] = pd_candles['close'].rolling(window=sliding_window_how_many_candles).min() | |
pd_candles['idmin_short_periods'] = pd_candles['close'].rolling(window=int(sliding_window_how_many_candles/slow_fast_interval_ratio)).apply(lambda x : x.idxmin()) | |
pd_candles['idmin_long_periods'] = pd_candles['close'].rolling(window=sliding_window_how_many_candles).apply(lambda x : x.idxmin()) | |
# ATR https://medium.com/codex/detecting-ranging-and-trending-markets-with-choppiness-index-in-python-1942e6450b58 | |
pd_candles.loc[:,'h_l'] = pd_candles['high'] - pd_candles['low'] | |
pd_candles.loc[:,'h_pc'] = abs(pd_candles['high'] - pd_candles['close'].shift(1)) | |
pd_candles.loc[:,'l_pc'] = abs(pd_candles['low'] - pd_candles['close'].shift(1)) | |
pd_candles.loc[:,'tr'] = pd_candles[['h_l', 'h_pc', 'l_pc']].max(axis=1) | |
pd_candles.loc[:,'atr'] = pd_candles['tr'].rolling(window=sliding_window_how_many_candles).mean() | |
# Hurst https://towardsdatascience.com/introduction-to-the-hurst-exponent-with-code-in-python-4da0414ca52e | |
pd_candles['hurst_exp'] = pd_candles['close'].rolling(window=120).apply(lambda x: compute_Hc(x, kind='price', simplified=True)[0]) | |
# Boillenger https://www.quantifiedstrategies.com/python-bollinger-band-trading-strategy/ | |
pd_candles.loc[:,'boillenger_upper'] = (pd_candles['sma_long_periods'] if not boillenger_ema else pd_candles['ema_long_periods']) + pd_candles['std'] * boillenger_std_multiples | |
pd_candles.loc[:,'boillenger_lower'] = (pd_candles['sma_long_periods'] if not boillenger_ema else pd_candles['ema_long_periods']) - pd_candles['std'] * boillenger_std_multiples | |
pd_candles.loc[:,'boillenger_channel_height'] = pd_candles['boillenger_upper'] - pd_candles['boillenger_lower'] | |
# RSI - https://www.youtube.com/watch?v=G9oUTi-PI18&t=809s | |
pd_candles.loc[:,'close_delta'] = pd_candles['close'].diff() | |
lo_up = pd_candles['close_delta'].clip(lower=0) | |
lo_down = -1 * pd_candles['close_delta'].clip(upper=0) | |
pd_candles.loc[:,'up'] = lo_up | |
pd_candles.loc[:,'down'] = lo_down | |
if rsi_ema == True: | |
# Use exponential moving average | |
lo_ma_up = lo_up.ewm(com = sliding_window_how_many_candles - 1, adjust=True, min_periods = sliding_window_how_many_candles).mean() | |
lo_ma_down = lo_down.ewm(com = sliding_window_how_many_candles - 1, adjust=True, min_periods = sliding_window_how_many_candles).mean() | |
else: | |
# Use simple moving average | |
lo_ma_up = lo_up.rolling(window = sliding_window_how_many_candles, adjust=False).mean() | |
lo_ma_down = lo_down.rolling(window = sliding_window_how_many_candles, adjust=False).mean() | |
lo_rs = lo_ma_up / lo_ma_down | |
pd_candles.loc[:,'rsi'] = 100 - (100/(1 + lo_rs)) | |
# MACD https://www.investopedia.com/terms/m/macd.asp | |
pd_candles['macd'] = pd_candles['ema_short_periods'] - pd_candles['ema_long_periods'] | |
pd_candles['signal'] = pd_candles['macd'].ewm(span=9, adjust=False).mean() | |
pd_candles['macd_minus_signal'] = pd_candles['macd'] - pd_candles['signal'] | |
# Slope on EMA | |
X = sm.add_constant(range(len(pd_candles['ema_short_periods']))) | |
rolling_slope = pd_candles['ema_short_periods'].rolling(window=int(sliding_window_how_many_candles/slow_fast_interval_ratio)).apply(lambda x: sm.OLS(x, X[:len(x)]).fit().params[1], raw=False) | |
pd_candles['ema_short_slope'] = rolling_slope | |
X = sm.add_constant(range(len(pd_candles['ema_long_periods']))) | |
rolling_slope = pd_candles['ema_long_periods'].rolling(window=sliding_window_how_many_candles).apply(lambda x: sm.OLS(x, X[:len(x)]).fit().params[1], raw=False) | |
pd_candles['ema_long_slope'] = rolling_slope | |
# Fibonacci | |
TARGET_FIB_LEVEL = 0.618 | |
pd_candles['fib_618_short_periods'] = pd_candles.apply(lambda rw : estimate_fib_retracement(rw['min_short_periods'], rw['idmin_short_periods'], rw['max_short_periods'], rw['idmax_short_periods'], TARGET_FIB_LEVEL), axis=1) | |
pd_candles['fib_618_long_periods'] = pd_candles.apply(lambda rw : estimate_fib_retracement(rw['min_long_periods'], rw['idmin_long_periods'], rw['max_long_periods'], rw['idmax_long_periods'], TARGET_FIB_LEVEL), axis=1) | |
# Inflection points | |
pd_candles['gap_close_vs_ema'] = pd_candles['close'] - pd_candles['ema_long_periods'] | |
pd_candles['close_above_or_below_ema'] = None | |
pd_candles.loc[pd_candles['gap_close_vs_ema'] > 0, 'close_above_or_below_ema'] = 'above' | |
pd_candles.loc[pd_candles['gap_close_vs_ema'] < 0, 'close_above_or_below_ema'] = 'below' | |
pd_candles.loc[ | |
(pd_candles['close_above_or_below_ema'] != pd_candles['close_above_or_below_ema'].shift(-1)), | |
'close_vs_ema_inflection' | |
] = np.sign(pd_candles['close'] - pd_candles['ema_long_periods']) | |
def partition_sliding_window( | |
pd_candles : pd.DataFrame, | |
sliding_window_how_many_candles : int, | |
smoothing_window_size_ratio : int, | |
linregress_stderr_threshold : float, | |
max_recur_depth : int, | |
min_segment_size_how_many_candles : int, | |
segment_consolidate_slope_ratio_threshold : float, | |
sideway_price_condition_threshold | |
) -> List[Dict]: | |
window_size = int(sliding_window_how_many_candles/smoothing_window_size_ratio) | |
# window_size = 8 # @hack | |
smoothed_colse = pd.Series(pd_candles['close']).rolling(window=window_size, min_periods=window_size).mean() | |
pd_candles['smoothed_close'] = smoothed_colse | |
maxima_minima = find_local_max_min(values = pd_candles['close'].to_list(), merge_distance=1) # @CRITICAL close vs smoothed_close and merge_distance | |
maxima = maxima_minima['local_max'] | |
minima = maxima_minima['local_min'] | |
maxima = [x for x in maxima if x>=pd_candles.index.min()] | |
minima = [x for x in minima if x>=pd_candles.index.min()] | |
pd_candles['maxima'] = False | |
pd_candles['minima'] = False | |
pd_candles.loc[maxima, 'maxima'] = True | |
pd_candles.loc[minima, 'minima'] = True | |
inflection_points = pd_candles[(pd_candles.close_vs_ema_inflection == 1) | (pd_candles.close_vs_ema_inflection == -1)].index.tolist() | |
inflection_points = [ index-1 for index in inflection_points ] | |
if (pd_candles.shape[0]-1) not in inflection_points: | |
inflection_points.append(pd_candles.shape[0]-1) | |
last_point = inflection_points[0] | |
sparse_inflection_points = [ last_point ] | |
for point in inflection_points: | |
if (point not in sparse_inflection_points) and ((point-last_point)>min_segment_size_how_many_candles): | |
sparse_inflection_points.append(point) | |
last_point = point | |
inflection_points = sparse_inflection_points | |
def _compute_new_segment( | |
pd_candles : pd.DataFrame, | |
start_index : int, | |
end_index : int, | |
cur_recur_depth : int, | |
linregress_stderr_threshold : float = 50, | |
max_recur_depth : int = 2, | |
min_segment_size_how_many_candles : int = 15 | |
) -> List[Dict]: | |
new_segments : List[Dict] = None | |
if end_index>pd_candles.shape[0]-1: | |
end_index = pd_candles.shape[0]-1 | |
start_upper = pd_candles.iloc[start_index]['boillenger_upper'] | |
end_upper = pd_candles.iloc[end_index]['boillenger_upper'] | |
start_lower = pd_candles.iloc[start_index]['boillenger_lower'] | |
end_lower = pd_candles.iloc[end_index]['boillenger_lower'] | |
start_datetime = pd_candles.iloc[start_index]['datetime'] | |
end_datetime = pd_candles.iloc[end_index]['datetime'] | |
start_close = pd_candles.iloc[start_index]['close'] | |
end_close = pd_candles.iloc[end_index]['close'] | |
# Using Boillenger upper and lower only | |
maxima_idx_boillenger = [ start_index, end_index ] | |
maxima_close_boillenger = [ start_upper, end_upper ] | |
minima_idx_boillenger = [ start_index, end_index ] | |
minima_close_boillenger = [ start_lower, end_lower ] | |
maxima_linregress_boillenger = linregress(maxima_idx_boillenger, maxima_close_boillenger) | |
minima_linregress_boillenger = linregress(minima_idx_boillenger, minima_close_boillenger) | |
# Using Boillenger upper and lower AND Local maxima/minima | |
maxima_idx_full = [start_index] + [ x for x in maxima if x>=start_index+1 and x<end_index ] + [end_index] | |
maxima_close_full = [ start_upper if not math.isnan(start_upper) else start_close ] + [ pd_candles.loc[x]['close'] for x in maxima if x>start_index and x<end_index ] + [ end_upper ] | |
minima_idx_full = [start_index] + [ x for x in minima if x>=start_index+1 and x<end_index ] + [end_index] | |
minima_close_full = [ start_lower if not math.isnan(start_lower) else start_close ] + [ pd_candles.loc[x]['close'] for x in minima if x>start_index and x<end_index ] + [ end_lower ] | |
maxima_linregress_full = linregress(maxima_idx_full, maxima_close_full) | |
minima_linregress_full = linregress(minima_idx_full, minima_close_full) | |
largest_candle_index = pd_candles.iloc[start_index:end_index,:]['candle_height'].idxmax() | |
if ( | |
(abs(maxima_linregress_full.stderr) < linregress_stderr_threshold and abs(minima_linregress_full.stderr) < linregress_stderr_threshold) | |
or cur_recur_depth>=max_recur_depth | |
or (start_index==largest_candle_index or end_index==largest_candle_index+1) | |
or ( | |
(end_index-largest_candle_index < min_segment_size_how_many_candles) | |
or (largest_candle_index - start_index < min_segment_size_how_many_candles) | |
) | |
): | |
new_segment = { | |
'start' : start_index, | |
'end' : end_index, | |
'start_datetime' : start_datetime, | |
'end_datetime' : end_datetime, | |
'start_close' : start_close, | |
'end_close' : end_close, | |
'window_size_num_intervals' : end_index - start_index, | |
'cur_recur_depth' : cur_recur_depth, | |
'up_or_down' : 'up' if end_close>=start_close else 'down', | |
'maxima_idx_boillenger' : maxima_idx_boillenger, | |
'maxima_close_boillenger' : maxima_close_boillenger, | |
'minima_idx_boillenger' : minima_idx_boillenger, | |
'minima_close_boillenger' : minima_close_boillenger, | |
'maxima_linregress_boillenger' : maxima_linregress_boillenger, | |
'minima_linregress_boillenger' : minima_linregress_boillenger, | |
'maxima_linregress_full' : maxima_linregress_full, | |
'minima_linregress_full' : minima_linregress_full, | |
} | |
new_segments = [ new_segment ] | |
else: | |
new_segments1 = _compute_new_segment(pd_candles, start_index, largest_candle_index, cur_recur_depth+1) | |
new_segments2 = _compute_new_segment(pd_candles, largest_candle_index+1, end_index, cur_recur_depth+1) | |
new_segments = new_segments1 + new_segments2 | |
return new_segments | |
segments = [] | |
for end_index in inflection_points: | |
if not segments: | |
start_index = 0 | |
inscope_maxima = [ x for x in maxima if x>=0 and x<end_index ] | |
inscope_minima = [ x for x in minima if x>=0 and x<end_index ] | |
if inscope_maxima and inscope_minima: | |
if sliding_window_how_many_candles<end_index: | |
start_index = sliding_window_how_many_candles | |
new_segments = _compute_new_segment(pd_candles, start_index, end_index, 0, linregress_stderr_threshold, max_recur_depth, min_segment_size_how_many_candles) | |
segments = segments + new_segments | |
else: | |
start_index = segments[-1]['end'] | |
new_segments = _compute_new_segment(pd_candles, start_index, end_index, 0, linregress_stderr_threshold, max_recur_depth, min_segment_size_how_many_candles) | |
segments = segments + new_segments | |
''' | |
You have five kinds of wedges: | |
a. Rising parallel | |
b. Rising converging | |
c. Side way | |
d. Falling parallel | |
e. Falling converging | |
Here, we're merging 'parallel' segments based on slope of 'maxima_linregress_boillenger' and 'minima_linregress_boillenger' from adjacent segments. | |
''' | |
consolidated_segements = [ segments[0] ] | |
for segment in segments: | |
if segment not in consolidated_segements: | |
last_segment = consolidated_segements[-1] | |
last_segment_maxima_slope = last_segment['maxima_linregress_boillenger'].slope | |
last_segment_minima_slope = last_segment['minima_linregress_boillenger'].slope | |
this_segment_maxima_slope = segment['maxima_linregress_boillenger'].slope | |
this_segment_minima_slope = segment['minima_linregress_boillenger'].slope | |
if math.isnan(last_segment_maxima_slope) or math.isnan(last_segment_minima_slope): | |
consolidated_segements.append(segment) | |
else: | |
if ( | |
abs(last_segment_maxima_slope/this_segment_maxima_slope-1)<segment_consolidate_slope_ratio_threshold | |
and abs(last_segment_minima_slope/this_segment_minima_slope-1)<segment_consolidate_slope_ratio_threshold | |
): | |
consolidated_segements.pop() | |
start_index = last_segment['maxima_idx_boillenger'][0] | |
end_index = segment['maxima_idx_boillenger'][-1] | |
maxima_idx_boillenger = [ start_index, end_index ] | |
maxima_close_boillenger = [ last_segment['maxima_close_boillenger'][0], segment['maxima_close_boillenger'][-1] ] | |
minima_idx_boillenger = maxima_idx_boillenger | |
minima_close_boillenger = [ last_segment['minima_close_boillenger'][0], segment['minima_close_boillenger'][-1] ] | |
maxima_linregress_boillenger = linregress(maxima_idx_boillenger, maxima_close_boillenger) | |
minima_linregress_boillenger = linregress(minima_idx_boillenger, minima_close_boillenger) | |
# Using Boillenger upper and lower AND Local maxima/minima | |
start_upper = pd_candles.iloc[start_index]['boillenger_upper'] | |
end_upper = pd_candles.iloc[end_index]['boillenger_upper'] | |
start_lower = pd_candles.iloc[start_index]['boillenger_lower'] | |
end_lower = pd_candles.iloc[end_index]['boillenger_lower'] | |
maxima_idx_full = [last_segment['start']] + [ x for x in maxima if x>=start_index+1 and x<end_index ] + [segment['end']] | |
maxima_close_full = [ start_upper ] + [ pd_candles.loc[x]['close'] for x in maxima if x>start_index and x<end_index ] + [ end_upper ] | |
minima_idx_full = [last_segment['start']] + [ x for x in minima if x>=start_index+1 and x<end_index ] + [segment['end']] | |
minima_close_full = [ start_lower ] + [ pd_candles.loc[x]['close'] for x in minima if x>start_index and x<end_index ] + [ end_lower ] | |
maxima_linregress_full = linregress(maxima_idx_full, maxima_close_full) | |
minima_linregress_full = linregress(minima_idx_full, minima_close_full) | |
new_segment = { | |
'start' : last_segment['start'], | |
'end' : segment['end'], | |
'start_datetime' : last_segment['start_datetime'], | |
'end_datetime' : segment['end_datetime'], | |
'start_close' : last_segment['start_close'], | |
'end_close' : segment['end_close'], | |
'window_size_num_intervals' : end_index - start_index, | |
'cur_recur_depth' : max(last_segment['cur_recur_depth'], segment['cur_recur_depth']), | |
'up_or_down' : 'up' if segment['end_close']>=last_segment['start_close'] else 'down', | |
'maxima_idx_boillenger' : maxima_idx_boillenger, | |
'maxima_close_boillenger' : maxima_close_boillenger, | |
'minima_idx_boillenger' : minima_idx_boillenger, | |
'minima_close_boillenger' : minima_close_boillenger, | |
'maxima_linregress_boillenger' : maxima_linregress_boillenger, | |
'minima_linregress_boillenger' : minima_linregress_boillenger, | |
'maxima_linregress_full' : maxima_linregress_full, | |
'minima_linregress_full' : minima_linregress_full, | |
} | |
consolidated_segements.append(new_segment) | |
else: | |
consolidated_segements.append(segment) | |
''' | |
Depending on 'sliding_window_how_many_candles', pd_candles['boillenger_upper'] and pd_candles['boillenger_lower'] from 'compute_candles_stats' may be nan in first few segments. | |
So here, we're back filling pd_candles['boillenger_upper'] and pd_candles['boillenger_lower'] from subsequent segments. | |
''' | |
last_segment = consolidated_segements[-1] | |
for i in range(len(consolidated_segements)-1, -1, -1): | |
segment = consolidated_segements[i] | |
if math.isnan(segment['maxima_close_boillenger'][0]) or math.isnan(segment['minima_close_boillenger'][0]): | |
start_index = segment['start'] | |
end_index = segment['end'] | |
start_close = segment['start_close'] | |
# Using Boillenger upper and lower only | |
maxima_idx_boillenger = segment['maxima_idx_boillenger'] | |
minima_idx_boillenger = segment['minima_idx_boillenger'] | |
maxima_close_boillenger = segment['maxima_close_boillenger'] | |
minima_close_boillenger = segment['minima_close_boillenger'] | |
if math.isnan(maxima_close_boillenger[-1]) or not math.isnan(minima_close_boillenger[-1]): | |
maxima_close_boillenger[-1] = last_segment['maxima_close_boillenger'][0] | |
minima_close_boillenger[-1] = last_segment['minima_close_boillenger'][0] | |
end_boillenger_height = maxima_close_boillenger[-1] - minima_close_boillenger[-1] | |
maxima_close_boillenger[0] = segment['start_close'] + end_boillenger_height/2 | |
minima_close_boillenger[0] = segment['start_close'] - end_boillenger_height/2 | |
maxima_linregress_boillenger = linregress(maxima_idx_boillenger, maxima_close_boillenger) | |
minima_linregress_boillenger = linregress(minima_idx_boillenger, minima_close_boillenger) | |
# Using Boillenger upper and lower AND Local maxima/minima | |
start_upper = maxima_close_boillenger[0] | |
end_upper = maxima_close_boillenger[-1] | |
start_lower = minima_close_boillenger[0] | |
end_lower = minima_close_boillenger[-1] | |
maxima_idx_full = [start_index] + [ x for x in maxima if x>=start_index+1 and x<end_index ] + [end_index] | |
maxima_close_full = [ start_upper if not math.isnan(start_upper) else start_close ] + [ pd_candles.loc[x]['close'] for x in maxima if x>start_index and x<end_index ] + [ end_upper ] | |
minima_idx_full = [start_index] + [ x for x in minima if x>=start_index+1 and x<end_index ] + [end_index] | |
minima_close_full = [ start_lower if not math.isnan(start_lower) else start_close ] + [ pd_candles.loc[x]['close'] for x in minima if x>start_index and x<end_index ] + [ end_lower ] | |
maxima_linregress_full = linregress(maxima_idx_full, maxima_close_full) | |
minima_linregress_full = linregress(minima_idx_full, minima_close_full) | |
segment['maxima_linregress_boillenger'] = maxima_linregress_boillenger | |
segment['minima_linregress_boillenger'] = minima_linregress_boillenger | |
segment['maxima_linregress_full'] = maxima_linregress_full | |
segment['minima_linregress_full'] = minima_linregress_full | |
last_segment = segment | |
''' | |
You have five kinds of wedges: | |
a. Rising parallel | |
b. Rising converging/diverging | |
c. Side way | |
d. Falling parallel | |
e. Falling converging/diverging | |
''' | |
def classify_segment( | |
segment : Dict, | |
segment_consolidate_slope_ratio_threshold : float, | |
sideway_price_condition_threshold : float | |
): | |
start_close = segment['start_close'] | |
end_close = segment['end_close'] | |
maxima_close_boillenger = segment['maxima_close_boillenger'] | |
minima_close_boillenger = segment['minima_close_boillenger'] | |
start_height : float = maxima_close_boillenger[0] - minima_close_boillenger[0] | |
end_height : float = maxima_close_boillenger[-1] - minima_close_boillenger[-1] | |
upper_slope = segment['maxima_linregress_boillenger'].slope | |
lower_slope = segment['minima_linregress_boillenger'].slope | |
is_parallel : bool = True if abs((upper_slope/lower_slope) -1) > segment_consolidate_slope_ratio_threshold else False | |
is_rising : bool = True if end_close>start_close else False | |
is_sideway : bool = True if abs((start_close/end_close)-1) < sideway_price_condition_threshold else False | |
is_converging : bool = True if start_height>end_height and start_height/end_height>2 else False | |
is_diverging : bool = True if end_height>start_height and end_height/start_height>2 else False | |
if is_sideway: | |
segment['class'] = 'sideway' | |
if is_converging: | |
segment['class'] = 'sideway_converging' | |
elif is_diverging: | |
segment['class'] = 'sideway_diverging' | |
else: | |
if is_rising: | |
if is_parallel: | |
segment['class'] = 'rising_parallel' | |
else: | |
if is_converging: | |
segment['class'] = 'rising_converging' | |
elif is_diverging: | |
segment['class'] = 'rising_diverging' | |
else: | |
segment['class'] = 'rising_parallel' | |
else: | |
if is_parallel: | |
segment['class'] = 'falling_parallel' | |
else: | |
if is_converging: | |
segment['class'] = 'falling_converging' | |
elif is_diverging: | |
segment['class'] = 'falling_diverging' | |
else: | |
segment['class'] = 'falling_parallel' | |
for segment in consolidated_segements: | |
classify_segment(segment, segment_consolidate_slope_ratio_threshold, sideway_price_condition_threshold) | |
return { | |
'minima' : minima, | |
'maxima' : maxima, | |
'segments' : consolidated_segements | |
} | |
_ticker = ticker.split(":")[0].replace("/","") | |
target_candle_file_name : str = f'{_ticker}_candles_{dt_start.strftime("%Y-%m-%d-%H-%M-%S")}_{dt_end.strftime("%Y-%m-%d-%H-%M-%S")}_{ts_candle_size}.csv' | |
if not os.path.isfile(target_candle_file_name): | |
candles = fetch_candles( | |
cutoff_ts=dt_start.timestamp(), test_end_date_ts=dt_end.timestamp(), | |
exchange=target_exchange, | |
normalized_symbols=[ticker], | |
candle_size = ts_candle_size, | |
how_many_candles = ts_how_many_candles, | |
num_candles_limit=num_candles_limit, | |
logger=logger, | |
cache_dir=CACHE_CANDLES | |
) | |
pd_candles = candles[ticker] | |
compute_candles_stats(pd_candles, boillenger_std_multiples, sliding_window_how_many_candles) | |
pd_candles.to_csv(target_candle_file_name) | |
else: | |
pd_candles = pd.read_csv(target_candle_file_name) | |
fix_column_types(pd_candles) | |
# ATH (All Time High) | |
ath_global_index = pd_candles['close'].idxmax() | |
ath_global_price = pd_candles['close'].max() | |
result = partition_sliding_window( | |
pd_candles = pd_candles, | |
sliding_window_how_many_candles = sliding_window_how_many_candles, | |
smoothing_window_size_ratio = smoothing_window_size_ratio, | |
linregress_stderr_threshold = linregress_stderr_threshold, | |
max_recur_depth = max_recur_depth, | |
min_segment_size_how_many_candles = min_segment_size_how_many_candles, | |
segment_consolidate_slope_ratio_threshold = segment_consolidate_slope_ratio_threshold, | |
sideway_price_condition_threshold = sideway_price_condition_threshold | |
) | |
minima = result['minima'] | |
maxima = result['maxima'] | |
segments = result['segments'] | |
pd_segments = pd.DataFrame( | |
[ | |
{ | |
'start' : segment['start'], | |
'end' : segment['end'], | |
'start_datetime' : segment['start_datetime'], | |
'end_datetime' : segment['end_datetime'], | |
'start_close' : segment['start_close'], | |
'end_close' : segment['end_close'], | |
'window_size_num_intervals' : segment['window_size_num_intervals'], | |
'cur_recur_depth' : segment['cur_recur_depth'], | |
'up_or_down' : segment['up_or_down'], | |
'class' : segment['class'], | |
'maxima_linregress_slope' : segment['maxima_linregress_full'].slope, | |
'maxima_linregress_intercept' : segment['maxima_linregress_full'].intercept, | |
'maxima_linregress_std_err' : segment['maxima_linregress_full'].stderr, | |
'minima_linregress_slope' : segment['minima_linregress_full'].slope, | |
'minima_linregress_intercept' : segment['minima_linregress_full'].intercept, | |
'minima_linregress_std_err' : segment['minima_linregress_full'].stderr | |
} | |
for segment in segments ] | |
) | |
import matplotlib.pyplot as plt | |
import matplotlib.gridspec as gridspec | |
fig = plt.figure(figsize=(15, 15), facecolor='black') | |
gs = gridspec.GridSpec(3, 1, height_ratios=[2, 0.25, 1.25]) | |
# Price Chart | |
ax0 = plt.subplot(gs[0]) | |
ax0.plot(pd_candles['datetime'], pd_candles['close'], label='Close', color='dodgerblue') | |
ax0.plot(pd_candles['datetime'], pd_candles['smoothed_close'], label='Smoothed Close', color='yellow') | |
ax0.plot(pd_candles['datetime'], pd_candles['ema_close'], label='3m EMA', linestyle='--', color='orange') | |
ax0.fill_between(pd_candles['datetime'], pd_candles['close'], pd_candles['ema_close'], where=(pd_candles['close'] > pd_candles['ema_close']), interpolate=True, color='dodgerblue', alpha=0.3, label='Bull Market') | |
ax0.fill_between(pd_candles['datetime'], pd_candles['close'], pd_candles['ema_close'], where=(pd_candles['close'] <= pd_candles['ema_close']), interpolate=True, color='red', alpha=0.3, label='Bear Market') | |
ax0.set_title('Close vs EMA', color='white') | |
ax0.set_xlabel('Date', color='white') | |
ax0.set_ylabel('Price', color='white') | |
legend = ax0.legend() | |
legend.get_frame().set_facecolor('black') | |
legend.get_frame().set_edgecolor('white') | |
for text in legend.get_texts(): | |
text.set_color('white') | |
ax0.plot(pd_candles['datetime'][ath_global_index], ath_global_price, marker='x', markersize=10, color='red', label='ATH Global') | |
# @CRITICAL close vs smoothed_close and merge_distance | |
for maxima_index in maxima: | |
ax0.plot(pd_candles['datetime'][maxima_index], pd_candles['close'][maxima_index], marker='+', markersize=8, color='yellow', label='maxima') | |
for minima_index in minima: | |
ax0.plot(pd_candles['datetime'][minima_index], pd_candles['close'][minima_index], marker='o', markersize=5, color='yellow', label='minima') | |
for segment in segments: | |
ax0.axvline(x=pd_candles['datetime'][segment['end']], color='gray', linewidth=2, linestyle='--') | |
if segment['maxima_linregress_boillenger'] is not None: | |
''' | |
We don't need to compute y_series like this: | |
slope_maxima = segment['maxima_linregress_boillenger'].slope | |
intercept_maxima = segment['maxima_linregress_boillenger'].intercept | |
segment_maxima_dates = pd_candles['datetime'][segment['maxima_idx_boillenger']] | |
y_series = [ slope_maxima * idx + intercept_maxima for idx in segment['maxima_idx_boillenger'] ] | |
But, syntax is just for reference. | |
''' | |
x_series = [ pd_candles.loc[idx]['datetime'] for idx in segment['maxima_idx_boillenger'] ] # x = dates | |
y_series = [ x for x in segment['maxima_close_boillenger'] ] # y = boillenger upper | |
ax0.plot( | |
x_series, | |
y_series, | |
color='green', linestyle='--', label='Maxima Linear Regression') | |
if segment['minima_linregress_boillenger'] is not None: | |
x_series = [ pd_candles.loc[idx]['datetime'] for idx in segment['minima_idx_boillenger'] ] # x = dates | |
y_series = [ x for x in segment['minima_close_boillenger'] ] # y = boillenger lower | |
ax0.plot( | |
x_series, | |
y_series, | |
color='red', linestyle='--', label='Minima Linear Regression') | |
# Volatility Chart | |
ax1 = plt.subplot(gs[1], sharex=ax0) | |
ax1.plot(pd_candles['datetime'], pd_candles['std'], label='Volatility', color='purple') | |
ax1.set_xlabel('Date', color='white') | |
ax1.set_ylabel('Volatility', color='white') | |
# EMA Close Slope Chart (Bar Chart) | |
ax2 = plt.subplot(gs[2], sharex=ax0) | |
ax2.bar(pd_candles['datetime'], pd_candles['ema_long_slope'], color='cyan', label='EMA Close Slope') | |
ax2.set_xlabel('Date', color='white') | |
ax2.set_ylabel('EMA Close Slope', color='white') | |
ax0.set_facecolor('black') | |
ax1.set_facecolor('black') | |
ax2.set_facecolor('black') | |
ax0.tick_params(axis='x', colors='white') | |
ax0.tick_params(axis='y', colors='white') | |
ax1.tick_params(axis='x', colors='white') | |
ax1.tick_params(axis='y', colors='white') | |
ax2.tick_params(axis='x', colors='white') | |
ax2.tick_params(axis='y', colors='white') | |
# Show the plot | |
plt.grid(True) | |
plt.tight_layout() | |
plt.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment