Skip to content

Instantly share code, notes, and snippets.

@normanlmfung
Last active May 5, 2024 04:37
Show Gist options
  • Save normanlmfung/adc0e396443f1a589937814a2aa973e4 to your computer and use it in GitHub Desktop.
Save normanlmfung/adc0e396443f1a589937814a2aa973e4 to your computer and use it in GitHub Desktop.
ts_slicer
import os
import sys
import json
from datetime import datetime
import logging
import arrow
from typing import Dict, List
import enum
import math
import pandas as pd
import statsmodels.api as sm # in-compatible with pypy
from hurst import compute_Hc # compatible with pypy
from scipy.stats import linregress
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from ccxt.binance import binance
from ccxt.okex5 import okex5
from ccxt.bybit import bybit
# For performance profiling
import cProfile
from pstats import Stats, SortKey
import random
import hashlib
import time
sys.path.append('./gizmo')
from market_data_gizmo import fetch_candles
base_dir : str = f"{os.path.dirname(sys.path[0])}\\ts_slicer"
REPORT_NAME : str = "ts_slicer"
CACHE_CANDLES : str = f"{os.path.dirname(sys.path[0])}\\casin0_bt\\cache\\research\\candles"
num_candles_limit : int = 100 # Depends on exchange but generally 100 ok!
param = {
'apiKey' : None,
'secret' : None,
'password' : None, # Other exchanges dont require this! This is saved in exchange.password!
'subaccount' : None,
'rateLimit' : 100, # In ms
'options' : {
'defaultType': 'swap', # Should test linear instead
'leg_room_bps' : 5,
'trade_fee_bps' : 3
}
}
ticker : str = 'BTC/USDT:USDT'
dt_start : datetime = datetime(2024,1,1)
dt_end : datetime = datetime(2024,4,11)
ts_candle_size : str = '1h'
'''
Length of sliding window, relative to overall window size between dt_start and dt_end.
Example, dt_start = 20200101, dt_end = 20240101. So total 4 yr.
If ts_how_many_candles = 4 yr (1460d), sliding_window_how_many_candles = 365 * 3/12 = 91.25 days: sliding_window_ratio = 1460/91.25 = 16
So, '16' is my fav.
'''
sliding_window_ratio = 16
ts_how_many_candles : int = 0
total_seconds = (dt_end - dt_start).total_seconds()
total_hours = total_seconds / 3600
total_days = total_hours / 24
sliding_window_how_many_candles : int = 0
if ts_candle_size == '1h':
sliding_window_how_many_candles = int(total_hours / sliding_window_ratio)
elif ts_candle_size == '1d':
sliding_window_how_many_candles = int(total_days / sliding_window_ratio)
smoothing_window_size_ratio : int = 3
boillenger_std_multiples : float = 1
linregress_stderr_threshold : float = 10
max_recur_depth : int = 2
min_segment_size_how_many_candles : int = 15
segment_consolidate_slope_ratio_threshold : float = 2
sideway_price_condition_threshold : float = 0.05 # i.e. Price if stay within 5% between start and close it's considered 'Sideway' market.
# atm, only support one single exchange
target_exchange = okex5(param)
target_exchange.name='okx_linear'
full_report_name : str = f"{REPORT_NAME}_"
logging.Formatter.converter = time.gmtime
logger = logging.getLogger(REPORT_NAME)
log_level = logging.INFO # DEBUG --> INFO --> WARNING --> ERROR
logger.setLevel(log_level)
format_str = '%(asctime)s %(message)s'
formatter = logging.Formatter(format_str)
sh = logging.StreamHandler()
sh.setLevel(log_level)
sh.setFormatter(formatter)
logger.addHandler(sh)
fh = logging.FileHandler(f"{full_report_name}.log")
fh.setLevel(log_level)
fh.setFormatter(formatter)
logger.addHandler(fh)
def log(message : str):
if not isinstance(message, Dict):
logger.info(message)
else:
logger.info(json.dumps(message, indent=4))
'''
The implementation from Geeksforgeeks https://www.geeksforgeeks.org/find-indices-of-all-local-maxima-and-local-minima-in-an-array/ is wrong.
If you have consecutive-duplicates, things will gall apart!
Example 1: values = [ 1, 2, 3, 7, 11, 15, 13, 12, 11, 6, 5, 7, 11, 8]
The default implementation correctly identify "15" as a peak.
Example 2: values = [ 1, 2, 3, 7, 11, 15, 15, 13, 12, 11, 6, 5, 7, 11, 8]
The default implementation will mark "11" as local maxima because there are two consecutive 15's.
Fix: https://stackoverflow.com/questions/75013708/python-finding-local-minima-and-maxima?noredirect=1#comment132376733_75013708
'''
def find_local_max_min(values: List[float], merge_distance: int = 5):
mx = []
mn = []
n = len(values)
if n < 2:
return None
if values[0] > values[1]:
mn.append(0)
elif values[0] < values[1]:
mx.append(0)
for i in range(1, n-1):
if all(values[i] >= values[j] for j in range(i-10, i+11) if 0 <= j < n):
mx.append(i)
elif all(values[i] <= values[j] for j in range(i-10, i+11) if 0 <= j < n):
mn.append(i)
if values[-1] > values[-2]:
mx.append(n-1)
elif values[-1] < values[-2]:
mn.append(n-1)
# Merge nearby maxima and minima
mx_merged = []
mn_merged = []
def merge_nearby_points(points):
merged = []
start = points[0]
for i in range(1, len(points)):
if points[i] - start > merge_distance:
merged.append(start + (points[i-1] - start) // 2) # Take the middle point
start = points[i]
merged.append(start + (points[-1] - start) // 2) # Take the middle point for the last segment
return merged
mx_merged = merge_nearby_points(mx)
mn_merged = merge_nearby_points(mn)
return {
'local_max': mx_merged,
'local_min': mn_merged
}
def fix_column_types(pd_candles : pd.DataFrame):
pd_candles['open'] = pd_candles['open'].astype(float)
pd_candles['high'] = pd_candles['high'].astype(float)
pd_candles['low'] = pd_candles['low'].astype(float)
pd_candles['close'] = pd_candles['close'].astype(float)
pd_candles['volume'] = pd_candles['volume'].astype(float)
pd_candles['datetime'] = pd.to_datetime(pd_candles['datetime'])
# This is to make it easy to do grouping with Excel pivot table
pd_candles['year'] = pd_candles['datetime'].dt.year
pd_candles['month'] = pd_candles['datetime'].dt.month
pd_candles['day'] = pd_candles['datetime'].dt.day
pd_candles['hour'] = pd_candles['datetime'].dt.hour
pd_candles['minute'] = pd_candles['datetime'].dt.minute
# Fibonacci
MAGIC_FIB_LEVELS = [0, 0.236, 0.382, 0.5, 0.618, 0.786, 1.00, 1.618, 2.618, 3.618, 4.236]
def estimate_fib_retracement(
swing_low: float,
swing_low_idx: int,
swing_high: float,
swing_high_idx: int,
target_fib_level: float = 0.618
):
price_range = swing_high - swing_low
# https://blog.quantinsti.com/fibonacci-retracement-trading-strategy-python/
if swing_low_idx < swing_high_idx:
retracement_price = swing_high - (price_range * target_fib_level)
else:
retracement_price = swing_low + (price_range * target_fib_level)
return retracement_price
'''
compute_candles_stats will calculate typical/basic technical indicators using in many trading strategies:
a. Basic SMA/EMAs (And slopes)
b. ATR
c. Hurst Exponent
d. RSI
e. MACD
f. Fibonacci
g. Inflections points: where 'close' crosses EMA from above or below.
Parameters:
a. boillenger_std_multiples: For boillenger upper and lower calc
b. sliding_window_how_many_candles: Moving averages calculation
c. rsi_ema: RSI calculated using EMA or SMA?
d. boillenger_ema: Boillenger calculated using SMA or EMA?
e. slow_fast_interval_ratios
MACD calculated using two moving averages.
Slow line using 'sliding_window_how_many_candles' intervals.
Fast line using 'sliding_window_how_many_candles/slow_fast_interval_ratios' intervals.
Example,
if Slow line is calculated using 24 candles and short_long_interval_ratios = 3,
then Fast line is calculated using 24/3 = 8 candles.
'''
def compute_candles_stats(
pd_candles : pd.DataFrame,
boillenger_std_multiples : float,
sliding_window_how_many_candles : int,
rsi_ema : bool = True,
boillenger_ema : bool = False,
slow_fast_interval_ratio : float = 3
):
fix_column_types(pd_candles)
pd_candles['candle_height'] = pd_candles['high'] - pd_candles['low']
'''
market_data_gizmo inserted dummy lines --> Need exclude those or "TypeError: unorderable types for comparison": pd_btc_candles = pd_btc_candles[pd_btc_candles.close.notnull()]
pd_btc_candles.loc[
(pd_btc_candles['close_above_or_below_ema'] != pd_btc_candles['close_above_or_below_ema'].shift(1)) &
(abs(pd_btc_candles['gap_close_vs_ema']) > avg_gap_close_vs_ema),
'close_vs_ema_inflection'
] = np.sign(pd_btc_candles['close'] - pd_btc_candles['ema_long_periods']) <-- TypeError: unorderable types for comparison
'''
# pd_candles = pd_candles[pd_candles.close.notnull()] # Don't make a copy. Drop in-place
pd_candles.drop(pd_candles[pd_candles.close.isnull()].index, inplace=True)
pd_candles['pct_change_close'] = pd_candles['close'].pct_change() * 100
pd_candles['sma_short_periods'] = pd_candles['close'].rolling(window=int(sliding_window_how_many_candles/slow_fast_interval_ratio)).mean()
pd_candles['sma_long_periods'] = pd_candles['close'].rolling(window=sliding_window_how_many_candles).mean()
pd_candles['ema_short_periods'] = pd_candles['close'].ewm(span=int(sliding_window_how_many_candles/slow_fast_interval_ratio), adjust=False).mean()
pd_candles['ema_long_periods'] = pd_candles['close'].ewm(span=sliding_window_how_many_candles, adjust=False).mean()
pd_candles['ema_close'] = pd_candles['ema_long_periods'] # Alias, shorter name
pd_candles['std'] = pd_candles['close'].rolling(window=sliding_window_how_many_candles).std()
pd_candles['max_short_periods'] = pd_candles['close'].rolling(window=int(sliding_window_how_many_candles/slow_fast_interval_ratio)).max()
pd_candles['max_long_periods'] = pd_candles['close'].rolling(window=sliding_window_how_many_candles).max()
pd_candles['idmax_short_periods'] = pd_candles['close'].rolling(window=int(sliding_window_how_many_candles/slow_fast_interval_ratio)).apply(lambda x : x.idxmax())
pd_candles['idmax_long_periods'] = pd_candles['close'].rolling(window=sliding_window_how_many_candles).apply(lambda x : x.idxmax())
pd_candles['min_short_periods'] = pd_candles['close'].rolling(window=int(sliding_window_how_many_candles/slow_fast_interval_ratio)).min()
pd_candles['min_long_periods'] = pd_candles['close'].rolling(window=sliding_window_how_many_candles).min()
pd_candles['idmin_short_periods'] = pd_candles['close'].rolling(window=int(sliding_window_how_many_candles/slow_fast_interval_ratio)).apply(lambda x : x.idxmin())
pd_candles['idmin_long_periods'] = pd_candles['close'].rolling(window=sliding_window_how_many_candles).apply(lambda x : x.idxmin())
# ATR https://medium.com/codex/detecting-ranging-and-trending-markets-with-choppiness-index-in-python-1942e6450b58
pd_candles.loc[:,'h_l'] = pd_candles['high'] - pd_candles['low']
pd_candles.loc[:,'h_pc'] = abs(pd_candles['high'] - pd_candles['close'].shift(1))
pd_candles.loc[:,'l_pc'] = abs(pd_candles['low'] - pd_candles['close'].shift(1))
pd_candles.loc[:,'tr'] = pd_candles[['h_l', 'h_pc', 'l_pc']].max(axis=1)
pd_candles.loc[:,'atr'] = pd_candles['tr'].rolling(window=sliding_window_how_many_candles).mean()
# Hurst https://towardsdatascience.com/introduction-to-the-hurst-exponent-with-code-in-python-4da0414ca52e
pd_candles['hurst_exp'] = pd_candles['close'].rolling(window=120).apply(lambda x: compute_Hc(x, kind='price', simplified=True)[0])
# Boillenger https://www.quantifiedstrategies.com/python-bollinger-band-trading-strategy/
pd_candles.loc[:,'boillenger_upper'] = (pd_candles['sma_long_periods'] if not boillenger_ema else pd_candles['ema_long_periods']) + pd_candles['std'] * boillenger_std_multiples
pd_candles.loc[:,'boillenger_lower'] = (pd_candles['sma_long_periods'] if not boillenger_ema else pd_candles['ema_long_periods']) - pd_candles['std'] * boillenger_std_multiples
pd_candles.loc[:,'boillenger_channel_height'] = pd_candles['boillenger_upper'] - pd_candles['boillenger_lower']
# RSI - https://www.youtube.com/watch?v=G9oUTi-PI18&t=809s
pd_candles.loc[:,'close_delta'] = pd_candles['close'].diff()
lo_up = pd_candles['close_delta'].clip(lower=0)
lo_down = -1 * pd_candles['close_delta'].clip(upper=0)
pd_candles.loc[:,'up'] = lo_up
pd_candles.loc[:,'down'] = lo_down
if rsi_ema == True:
# Use exponential moving average
lo_ma_up = lo_up.ewm(com = sliding_window_how_many_candles - 1, adjust=True, min_periods = sliding_window_how_many_candles).mean()
lo_ma_down = lo_down.ewm(com = sliding_window_how_many_candles - 1, adjust=True, min_periods = sliding_window_how_many_candles).mean()
else:
# Use simple moving average
lo_ma_up = lo_up.rolling(window = sliding_window_how_many_candles, adjust=False).mean()
lo_ma_down = lo_down.rolling(window = sliding_window_how_many_candles, adjust=False).mean()
lo_rs = lo_ma_up / lo_ma_down
pd_candles.loc[:,'rsi'] = 100 - (100/(1 + lo_rs))
# MACD https://www.investopedia.com/terms/m/macd.asp
pd_candles['macd'] = pd_candles['ema_short_periods'] - pd_candles['ema_long_periods']
pd_candles['signal'] = pd_candles['macd'].ewm(span=9, adjust=False).mean()
pd_candles['macd_minus_signal'] = pd_candles['macd'] - pd_candles['signal']
# Slope on EMA
X = sm.add_constant(range(len(pd_candles['ema_short_periods'])))
rolling_slope = pd_candles['ema_short_periods'].rolling(window=int(sliding_window_how_many_candles/slow_fast_interval_ratio)).apply(lambda x: sm.OLS(x, X[:len(x)]).fit().params[1], raw=False)
pd_candles['ema_short_slope'] = rolling_slope
X = sm.add_constant(range(len(pd_candles['ema_long_periods'])))
rolling_slope = pd_candles['ema_long_periods'].rolling(window=sliding_window_how_many_candles).apply(lambda x: sm.OLS(x, X[:len(x)]).fit().params[1], raw=False)
pd_candles['ema_long_slope'] = rolling_slope
# Fibonacci
TARGET_FIB_LEVEL = 0.618
pd_candles['fib_618_short_periods'] = pd_candles.apply(lambda rw : estimate_fib_retracement(rw['min_short_periods'], rw['idmin_short_periods'], rw['max_short_periods'], rw['idmax_short_periods'], TARGET_FIB_LEVEL), axis=1)
pd_candles['fib_618_long_periods'] = pd_candles.apply(lambda rw : estimate_fib_retracement(rw['min_long_periods'], rw['idmin_long_periods'], rw['max_long_periods'], rw['idmax_long_periods'], TARGET_FIB_LEVEL), axis=1)
# Inflection points
pd_candles['gap_close_vs_ema'] = pd_candles['close'] - pd_candles['ema_long_periods']
pd_candles['close_above_or_below_ema'] = None
pd_candles.loc[pd_candles['gap_close_vs_ema'] > 0, 'close_above_or_below_ema'] = 'above'
pd_candles.loc[pd_candles['gap_close_vs_ema'] < 0, 'close_above_or_below_ema'] = 'below'
pd_candles.loc[
(pd_candles['close_above_or_below_ema'] != pd_candles['close_above_or_below_ema'].shift(-1)),
'close_vs_ema_inflection'
] = np.sign(pd_candles['close'] - pd_candles['ema_long_periods'])
def partition_sliding_window(
pd_candles : pd.DataFrame,
sliding_window_how_many_candles : int,
smoothing_window_size_ratio : int,
linregress_stderr_threshold : float,
max_recur_depth : int,
min_segment_size_how_many_candles : int,
segment_consolidate_slope_ratio_threshold : float,
sideway_price_condition_threshold
) -> List[Dict]:
window_size = int(sliding_window_how_many_candles/smoothing_window_size_ratio)
# window_size = 8 # @hack
smoothed_colse = pd.Series(pd_candles['close']).rolling(window=window_size, min_periods=window_size).mean()
pd_candles['smoothed_close'] = smoothed_colse
maxima_minima = find_local_max_min(values = pd_candles['close'].to_list(), merge_distance=1) # @CRITICAL close vs smoothed_close and merge_distance
maxima = maxima_minima['local_max']
minima = maxima_minima['local_min']
maxima = [x for x in maxima if x>=pd_candles.index.min()]
minima = [x for x in minima if x>=pd_candles.index.min()]
pd_candles['maxima'] = False
pd_candles['minima'] = False
pd_candles.loc[maxima, 'maxima'] = True
pd_candles.loc[minima, 'minima'] = True
inflection_points = pd_candles[(pd_candles.close_vs_ema_inflection == 1) | (pd_candles.close_vs_ema_inflection == -1)].index.tolist()
inflection_points = [ index-1 for index in inflection_points ]
if (pd_candles.shape[0]-1) not in inflection_points:
inflection_points.append(pd_candles.shape[0]-1)
last_point = inflection_points[0]
sparse_inflection_points = [ last_point ]
for point in inflection_points:
if (point not in sparse_inflection_points) and ((point-last_point)>min_segment_size_how_many_candles):
sparse_inflection_points.append(point)
last_point = point
inflection_points = sparse_inflection_points
def _compute_new_segment(
pd_candles : pd.DataFrame,
start_index : int,
end_index : int,
cur_recur_depth : int,
linregress_stderr_threshold : float = 50,
max_recur_depth : int = 2,
min_segment_size_how_many_candles : int = 15
) -> List[Dict]:
new_segments : List[Dict] = None
if end_index>pd_candles.shape[0]-1:
end_index = pd_candles.shape[0]-1
start_upper = pd_candles.iloc[start_index]['boillenger_upper']
end_upper = pd_candles.iloc[end_index]['boillenger_upper']
start_lower = pd_candles.iloc[start_index]['boillenger_lower']
end_lower = pd_candles.iloc[end_index]['boillenger_lower']
start_datetime = pd_candles.iloc[start_index]['datetime']
end_datetime = pd_candles.iloc[end_index]['datetime']
start_close = pd_candles.iloc[start_index]['close']
end_close = pd_candles.iloc[end_index]['close']
# Using Boillenger upper and lower only
maxima_idx_boillenger = [ start_index, end_index ]
maxima_close_boillenger = [ start_upper, end_upper ]
minima_idx_boillenger = [ start_index, end_index ]
minima_close_boillenger = [ start_lower, end_lower ]
maxima_linregress_boillenger = linregress(maxima_idx_boillenger, maxima_close_boillenger)
minima_linregress_boillenger = linregress(minima_idx_boillenger, minima_close_boillenger)
# Using Boillenger upper and lower AND Local maxima/minima
maxima_idx_full = [start_index] + [ x for x in maxima if x>=start_index+1 and x<end_index ] + [end_index]
maxima_close_full = [ start_upper if not math.isnan(start_upper) else start_close ] + [ pd_candles.loc[x]['close'] for x in maxima if x>start_index and x<end_index ] + [ end_upper ]
minima_idx_full = [start_index] + [ x for x in minima if x>=start_index+1 and x<end_index ] + [end_index]
minima_close_full = [ start_lower if not math.isnan(start_lower) else start_close ] + [ pd_candles.loc[x]['close'] for x in minima if x>start_index and x<end_index ] + [ end_lower ]
maxima_linregress_full = linregress(maxima_idx_full, maxima_close_full)
minima_linregress_full = linregress(minima_idx_full, minima_close_full)
largest_candle_index = pd_candles.iloc[start_index:end_index,:]['candle_height'].idxmax()
if (
(abs(maxima_linregress_full.stderr) < linregress_stderr_threshold and abs(minima_linregress_full.stderr) < linregress_stderr_threshold)
or cur_recur_depth>=max_recur_depth
or (start_index==largest_candle_index or end_index==largest_candle_index+1)
or (
(end_index-largest_candle_index < min_segment_size_how_many_candles)
or (largest_candle_index - start_index < min_segment_size_how_many_candles)
)
):
new_segment = {
'start' : start_index,
'end' : end_index,
'start_datetime' : start_datetime,
'end_datetime' : end_datetime,
'start_close' : start_close,
'end_close' : end_close,
'window_size_num_intervals' : end_index - start_index,
'cur_recur_depth' : cur_recur_depth,
'up_or_down' : 'up' if end_close>=start_close else 'down',
'maxima_idx_boillenger' : maxima_idx_boillenger,
'maxima_close_boillenger' : maxima_close_boillenger,
'minima_idx_boillenger' : minima_idx_boillenger,
'minima_close_boillenger' : minima_close_boillenger,
'maxima_linregress_boillenger' : maxima_linregress_boillenger,
'minima_linregress_boillenger' : minima_linregress_boillenger,
'maxima_linregress_full' : maxima_linregress_full,
'minima_linregress_full' : minima_linregress_full,
}
new_segments = [ new_segment ]
else:
new_segments1 = _compute_new_segment(pd_candles, start_index, largest_candle_index, cur_recur_depth+1)
new_segments2 = _compute_new_segment(pd_candles, largest_candle_index+1, end_index, cur_recur_depth+1)
new_segments = new_segments1 + new_segments2
return new_segments
segments = []
for end_index in inflection_points:
if not segments:
start_index = 0
inscope_maxima = [ x for x in maxima if x>=0 and x<end_index ]
inscope_minima = [ x for x in minima if x>=0 and x<end_index ]
if inscope_maxima and inscope_minima:
if sliding_window_how_many_candles<end_index:
start_index = sliding_window_how_many_candles
new_segments = _compute_new_segment(pd_candles, start_index, end_index, 0, linregress_stderr_threshold, max_recur_depth, min_segment_size_how_many_candles)
segments = segments + new_segments
else:
start_index = segments[-1]['end']
new_segments = _compute_new_segment(pd_candles, start_index, end_index, 0, linregress_stderr_threshold, max_recur_depth, min_segment_size_how_many_candles)
segments = segments + new_segments
'''
You have five kinds of wedges:
a. Rising parallel
b. Rising converging
c. Side way
d. Falling parallel
e. Falling converging
Here, we're merging 'parallel' segments based on slope of 'maxima_linregress_boillenger' and 'minima_linregress_boillenger' from adjacent segments.
'''
consolidated_segements = [ segments[0] ]
for segment in segments:
if segment not in consolidated_segements:
last_segment = consolidated_segements[-1]
last_segment_maxima_slope = last_segment['maxima_linregress_boillenger'].slope
last_segment_minima_slope = last_segment['minima_linregress_boillenger'].slope
this_segment_maxima_slope = segment['maxima_linregress_boillenger'].slope
this_segment_minima_slope = segment['minima_linregress_boillenger'].slope
if math.isnan(last_segment_maxima_slope) or math.isnan(last_segment_minima_slope):
consolidated_segements.append(segment)
else:
if (
abs(last_segment_maxima_slope/this_segment_maxima_slope-1)<segment_consolidate_slope_ratio_threshold
and abs(last_segment_minima_slope/this_segment_minima_slope-1)<segment_consolidate_slope_ratio_threshold
):
consolidated_segements.pop()
start_index = last_segment['maxima_idx_boillenger'][0]
end_index = segment['maxima_idx_boillenger'][-1]
maxima_idx_boillenger = [ start_index, end_index ]
maxima_close_boillenger = [ last_segment['maxima_close_boillenger'][0], segment['maxima_close_boillenger'][-1] ]
minima_idx_boillenger = maxima_idx_boillenger
minima_close_boillenger = [ last_segment['minima_close_boillenger'][0], segment['minima_close_boillenger'][-1] ]
maxima_linregress_boillenger = linregress(maxima_idx_boillenger, maxima_close_boillenger)
minima_linregress_boillenger = linregress(minima_idx_boillenger, minima_close_boillenger)
# Using Boillenger upper and lower AND Local maxima/minima
start_upper = pd_candles.iloc[start_index]['boillenger_upper']
end_upper = pd_candles.iloc[end_index]['boillenger_upper']
start_lower = pd_candles.iloc[start_index]['boillenger_lower']
end_lower = pd_candles.iloc[end_index]['boillenger_lower']
maxima_idx_full = [last_segment['start']] + [ x for x in maxima if x>=start_index+1 and x<end_index ] + [segment['end']]
maxima_close_full = [ start_upper ] + [ pd_candles.loc[x]['close'] for x in maxima if x>start_index and x<end_index ] + [ end_upper ]
minima_idx_full = [last_segment['start']] + [ x for x in minima if x>=start_index+1 and x<end_index ] + [segment['end']]
minima_close_full = [ start_lower ] + [ pd_candles.loc[x]['close'] for x in minima if x>start_index and x<end_index ] + [ end_lower ]
maxima_linregress_full = linregress(maxima_idx_full, maxima_close_full)
minima_linregress_full = linregress(minima_idx_full, minima_close_full)
new_segment = {
'start' : last_segment['start'],
'end' : segment['end'],
'start_datetime' : last_segment['start_datetime'],
'end_datetime' : segment['end_datetime'],
'start_close' : last_segment['start_close'],
'end_close' : segment['end_close'],
'window_size_num_intervals' : end_index - start_index,
'cur_recur_depth' : max(last_segment['cur_recur_depth'], segment['cur_recur_depth']),
'up_or_down' : 'up' if segment['end_close']>=last_segment['start_close'] else 'down',
'maxima_idx_boillenger' : maxima_idx_boillenger,
'maxima_close_boillenger' : maxima_close_boillenger,
'minima_idx_boillenger' : minima_idx_boillenger,
'minima_close_boillenger' : minima_close_boillenger,
'maxima_linregress_boillenger' : maxima_linregress_boillenger,
'minima_linregress_boillenger' : minima_linregress_boillenger,
'maxima_linregress_full' : maxima_linregress_full,
'minima_linregress_full' : minima_linregress_full,
}
consolidated_segements.append(new_segment)
else:
consolidated_segements.append(segment)
'''
Depending on 'sliding_window_how_many_candles', pd_candles['boillenger_upper'] and pd_candles['boillenger_lower'] from 'compute_candles_stats' may be nan in first few segments.
So here, we're back filling pd_candles['boillenger_upper'] and pd_candles['boillenger_lower'] from subsequent segments.
'''
last_segment = consolidated_segements[-1]
for i in range(len(consolidated_segements)-1, -1, -1):
segment = consolidated_segements[i]
if math.isnan(segment['maxima_close_boillenger'][0]) or math.isnan(segment['minima_close_boillenger'][0]):
start_index = segment['start']
end_index = segment['end']
start_close = segment['start_close']
# Using Boillenger upper and lower only
maxima_idx_boillenger = segment['maxima_idx_boillenger']
minima_idx_boillenger = segment['minima_idx_boillenger']
maxima_close_boillenger = segment['maxima_close_boillenger']
minima_close_boillenger = segment['minima_close_boillenger']
if math.isnan(maxima_close_boillenger[-1]) or not math.isnan(minima_close_boillenger[-1]):
maxima_close_boillenger[-1] = last_segment['maxima_close_boillenger'][0]
minima_close_boillenger[-1] = last_segment['minima_close_boillenger'][0]
end_boillenger_height = maxima_close_boillenger[-1] - minima_close_boillenger[-1]
maxima_close_boillenger[0] = segment['start_close'] + end_boillenger_height/2
minima_close_boillenger[0] = segment['start_close'] - end_boillenger_height/2
maxima_linregress_boillenger = linregress(maxima_idx_boillenger, maxima_close_boillenger)
minima_linregress_boillenger = linregress(minima_idx_boillenger, minima_close_boillenger)
# Using Boillenger upper and lower AND Local maxima/minima
start_upper = maxima_close_boillenger[0]
end_upper = maxima_close_boillenger[-1]
start_lower = minima_close_boillenger[0]
end_lower = minima_close_boillenger[-1]
maxima_idx_full = [start_index] + [ x for x in maxima if x>=start_index+1 and x<end_index ] + [end_index]
maxima_close_full = [ start_upper if not math.isnan(start_upper) else start_close ] + [ pd_candles.loc[x]['close'] for x in maxima if x>start_index and x<end_index ] + [ end_upper ]
minima_idx_full = [start_index] + [ x for x in minima if x>=start_index+1 and x<end_index ] + [end_index]
minima_close_full = [ start_lower if not math.isnan(start_lower) else start_close ] + [ pd_candles.loc[x]['close'] for x in minima if x>start_index and x<end_index ] + [ end_lower ]
maxima_linregress_full = linregress(maxima_idx_full, maxima_close_full)
minima_linregress_full = linregress(minima_idx_full, minima_close_full)
segment['maxima_linregress_boillenger'] = maxima_linregress_boillenger
segment['minima_linregress_boillenger'] = minima_linregress_boillenger
segment['maxima_linregress_full'] = maxima_linregress_full
segment['minima_linregress_full'] = minima_linregress_full
last_segment = segment
'''
You have five kinds of wedges:
a. Rising parallel
b. Rising converging/diverging
c. Side way
d. Falling parallel
e. Falling converging/diverging
'''
def classify_segment(
segment : Dict,
segment_consolidate_slope_ratio_threshold : float,
sideway_price_condition_threshold : float
):
start_close = segment['start_close']
end_close = segment['end_close']
maxima_close_boillenger = segment['maxima_close_boillenger']
minima_close_boillenger = segment['minima_close_boillenger']
start_height : float = maxima_close_boillenger[0] - minima_close_boillenger[0]
end_height : float = maxima_close_boillenger[-1] - minima_close_boillenger[-1]
upper_slope = segment['maxima_linregress_boillenger'].slope
lower_slope = segment['minima_linregress_boillenger'].slope
is_parallel : bool = True if abs((upper_slope/lower_slope) -1) > segment_consolidate_slope_ratio_threshold else False
is_rising : bool = True if end_close>start_close else False
is_sideway : bool = True if abs((start_close/end_close)-1) < sideway_price_condition_threshold else False
is_converging : bool = True if start_height>end_height and start_height/end_height>2 else False
is_diverging : bool = True if end_height>start_height and end_height/start_height>2 else False
if is_sideway:
segment['class'] = 'sideway'
if is_converging:
segment['class'] = 'sideway_converging'
elif is_diverging:
segment['class'] = 'sideway_diverging'
else:
if is_rising:
if is_parallel:
segment['class'] = 'rising_parallel'
else:
if is_converging:
segment['class'] = 'rising_converging'
elif is_diverging:
segment['class'] = 'rising_diverging'
else:
segment['class'] = 'rising_parallel'
else:
if is_parallel:
segment['class'] = 'falling_parallel'
else:
if is_converging:
segment['class'] = 'falling_converging'
elif is_diverging:
segment['class'] = 'falling_diverging'
else:
segment['class'] = 'falling_parallel'
for segment in consolidated_segements:
classify_segment(segment, segment_consolidate_slope_ratio_threshold, sideway_price_condition_threshold)
return {
'minima' : minima,
'maxima' : maxima,
'segments' : consolidated_segements
}
_ticker = ticker.split(":")[0].replace("/","")
target_candle_file_name : str = f'{_ticker}_candles_{dt_start.strftime("%Y-%m-%d-%H-%M-%S")}_{dt_end.strftime("%Y-%m-%d-%H-%M-%S")}_{ts_candle_size}.csv'
if not os.path.isfile(target_candle_file_name):
candles = fetch_candles(
cutoff_ts=dt_start.timestamp(), test_end_date_ts=dt_end.timestamp(),
exchange=target_exchange,
normalized_symbols=[ticker],
candle_size = ts_candle_size,
how_many_candles = ts_how_many_candles,
num_candles_limit=num_candles_limit,
logger=logger,
cache_dir=CACHE_CANDLES
)
pd_candles = candles[ticker]
compute_candles_stats(pd_candles, boillenger_std_multiples, sliding_window_how_many_candles)
pd_candles.to_csv(target_candle_file_name)
else:
pd_candles = pd.read_csv(target_candle_file_name)
fix_column_types(pd_candles)
# ATH (All Time High)
ath_global_index = pd_candles['close'].idxmax()
ath_global_price = pd_candles['close'].max()
result = partition_sliding_window(
pd_candles = pd_candles,
sliding_window_how_many_candles = sliding_window_how_many_candles,
smoothing_window_size_ratio = smoothing_window_size_ratio,
linregress_stderr_threshold = linregress_stderr_threshold,
max_recur_depth = max_recur_depth,
min_segment_size_how_many_candles = min_segment_size_how_many_candles,
segment_consolidate_slope_ratio_threshold = segment_consolidate_slope_ratio_threshold,
sideway_price_condition_threshold = sideway_price_condition_threshold
)
minima = result['minima']
maxima = result['maxima']
segments = result['segments']
pd_segments = pd.DataFrame(
[
{
'start' : segment['start'],
'end' : segment['end'],
'start_datetime' : segment['start_datetime'],
'end_datetime' : segment['end_datetime'],
'start_close' : segment['start_close'],
'end_close' : segment['end_close'],
'window_size_num_intervals' : segment['window_size_num_intervals'],
'cur_recur_depth' : segment['cur_recur_depth'],
'up_or_down' : segment['up_or_down'],
'class' : segment['class'],
'maxima_linregress_slope' : segment['maxima_linregress_full'].slope,
'maxima_linregress_intercept' : segment['maxima_linregress_full'].intercept,
'maxima_linregress_std_err' : segment['maxima_linregress_full'].stderr,
'minima_linregress_slope' : segment['minima_linregress_full'].slope,
'minima_linregress_intercept' : segment['minima_linregress_full'].intercept,
'minima_linregress_std_err' : segment['minima_linregress_full'].stderr
}
for segment in segments ]
)
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
fig = plt.figure(figsize=(15, 15), facecolor='black')
gs = gridspec.GridSpec(3, 1, height_ratios=[2, 0.25, 1.25])
# Price Chart
ax0 = plt.subplot(gs[0])
ax0.plot(pd_candles['datetime'], pd_candles['close'], label='Close', color='dodgerblue')
ax0.plot(pd_candles['datetime'], pd_candles['smoothed_close'], label='Smoothed Close', color='yellow')
ax0.plot(pd_candles['datetime'], pd_candles['ema_close'], label='3m EMA', linestyle='--', color='orange')
ax0.fill_between(pd_candles['datetime'], pd_candles['close'], pd_candles['ema_close'], where=(pd_candles['close'] > pd_candles['ema_close']), interpolate=True, color='dodgerblue', alpha=0.3, label='Bull Market')
ax0.fill_between(pd_candles['datetime'], pd_candles['close'], pd_candles['ema_close'], where=(pd_candles['close'] <= pd_candles['ema_close']), interpolate=True, color='red', alpha=0.3, label='Bear Market')
ax0.set_title('Close vs EMA', color='white')
ax0.set_xlabel('Date', color='white')
ax0.set_ylabel('Price', color='white')
legend = ax0.legend()
legend.get_frame().set_facecolor('black')
legend.get_frame().set_edgecolor('white')
for text in legend.get_texts():
text.set_color('white')
ax0.plot(pd_candles['datetime'][ath_global_index], ath_global_price, marker='x', markersize=10, color='red', label='ATH Global')
# @CRITICAL close vs smoothed_close and merge_distance
for maxima_index in maxima:
ax0.plot(pd_candles['datetime'][maxima_index], pd_candles['close'][maxima_index], marker='+', markersize=8, color='yellow', label='maxima')
for minima_index in minima:
ax0.plot(pd_candles['datetime'][minima_index], pd_candles['close'][minima_index], marker='o', markersize=5, color='yellow', label='minima')
for segment in segments:
ax0.axvline(x=pd_candles['datetime'][segment['end']], color='gray', linewidth=2, linestyle='--')
if segment['maxima_linregress_boillenger'] is not None:
'''
We don't need to compute y_series like this:
slope_maxima = segment['maxima_linregress_boillenger'].slope
intercept_maxima = segment['maxima_linregress_boillenger'].intercept
segment_maxima_dates = pd_candles['datetime'][segment['maxima_idx_boillenger']]
y_series = [ slope_maxima * idx + intercept_maxima for idx in segment['maxima_idx_boillenger'] ]
But, syntax is just for reference.
'''
x_series = [ pd_candles.loc[idx]['datetime'] for idx in segment['maxima_idx_boillenger'] ] # x = dates
y_series = [ x for x in segment['maxima_close_boillenger'] ] # y = boillenger upper
ax0.plot(
x_series,
y_series,
color='green', linestyle='--', label='Maxima Linear Regression')
if segment['minima_linregress_boillenger'] is not None:
x_series = [ pd_candles.loc[idx]['datetime'] for idx in segment['minima_idx_boillenger'] ] # x = dates
y_series = [ x for x in segment['minima_close_boillenger'] ] # y = boillenger lower
ax0.plot(
x_series,
y_series,
color='red', linestyle='--', label='Minima Linear Regression')
# Volatility Chart
ax1 = plt.subplot(gs[1], sharex=ax0)
ax1.plot(pd_candles['datetime'], pd_candles['std'], label='Volatility', color='purple')
ax1.set_xlabel('Date', color='white')
ax1.set_ylabel('Volatility', color='white')
# EMA Close Slope Chart (Bar Chart)
ax2 = plt.subplot(gs[2], sharex=ax0)
ax2.bar(pd_candles['datetime'], pd_candles['ema_long_slope'], color='cyan', label='EMA Close Slope')
ax2.set_xlabel('Date', color='white')
ax2.set_ylabel('EMA Close Slope', color='white')
ax0.set_facecolor('black')
ax1.set_facecolor('black')
ax2.set_facecolor('black')
ax0.tick_params(axis='x', colors='white')
ax0.tick_params(axis='y', colors='white')
ax1.tick_params(axis='x', colors='white')
ax1.tick_params(axis='y', colors='white')
ax2.tick_params(axis='x', colors='white')
ax2.tick_params(axis='y', colors='white')
# Show the plot
plt.grid(True)
plt.tight_layout()
plt.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment