Last active
May 23, 2021 06:51
-
-
Save binhna/7f3f30e10ff0c823ac13ea3ecde9f88a to your computer and use it in GitHub Desktop.
backtest wavetrend
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import talib | |
import pandas as pd | |
import glob | |
from tqdm import tqdm | |
def cal_wavetrend(df): | |
df['hlc3'] = (df['high']+df['low']+df['close'])/3 | |
df['esa'] = talib.EMA(df.hlc3, 10) | |
df['d'] = talib.EMA(abs(df.hlc3 - df.esa), 10) | |
df['ci'] = (df['hlc3'] - df['esa']) / (0.015*df['d']) | |
df['tci'] = talib.EMA(df.ci, 21) | |
df['wt2'] = talib.SMA(df.tci, 4) | |
sn = ['']*df.shape[0] | |
for index, row in df.iterrows(): | |
if index-1 > 0: | |
last_row = df.loc[[index-1]] | |
if (row['tci'] <= row['wt2'] and row['tci'] > 53 and row['wt2'] > 53) and \ | |
(last_row['tci'].item() >= last_row['wt2'].item() and \ | |
last_row['tci'].item() > 53 and last_row['wt2'].item() > 53): | |
#short | |
sn[index] = -1 | |
elif (row['tci'] >= row['wt2'] and row['tci'] < -53 and row['wt2'] < -53) and \ | |
(last_row['tci'].item() <= last_row['wt2'].item() and \ | |
last_row['tci'].item() < -53 and last_row['wt2'].item() < -53): | |
#long | |
sn[index] = 1 | |
df['signal'] = sn | |
df.rename(columns={'tci': 'wt1'}, inplace=True) | |
return df | |
def backtest(file): | |
symbol = file.split("/")[-1].split("-")[0] | |
df = pd.read_csv(file) | |
df = cal_wavetrend(df) | |
signals = df.signal.to_list() | |
high = df.high.to_list() | |
low = df.low.to_list() | |
return_needed = [] | |
for z in [0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 5.5, 6, 6.5, 7, 7.5, 8, 8.5, 9, 9.5, 10]: | |
result = {"long": [], "short": []} | |
tp_rate = sl_rate = z | |
i = 0 | |
while(i < len(signals)): | |
signal = signals[i] | |
if signal == 1: | |
result["long"].append(0) | |
tp_price = df.iloc[i]["close"]*(1+tp_rate/100) | |
sl_price = df.iloc[i]["close"]*(1-sl_rate/100) | |
j = i+1 | |
while(j<len(signals) and signals[j] != -1): | |
if high[j] >= tp_price: | |
result["long"][-1] = 1 | |
break | |
elif low[j] <= sl_price: | |
break | |
j+=1 | |
i = j+1 | |
elif signal == -1: | |
result["short"].append(0) | |
sl_price = df.iloc[i]["close"]*(1+sl_rate/100) | |
tp_price = df.iloc[i]["close"]*(1-tp_rate/100) | |
j = i+1 | |
while(j < len(signals) and signals[j] != -1): | |
if low[j] <= tp_price: | |
result["short"][-1] = 1 | |
break | |
elif high[j] >= sl_price: | |
break | |
j += 1 | |
i = j+1 | |
else: | |
i+=1 | |
long_result = round(sum(result['long'])/len(result['long']), 4) if result['long'] else 0 | |
short_result = round(sum(result['short'])/len(result['short']), 4) if result['short'] else 0 | |
# print(f"LONG: {long_result} | SHORT: {short_result}") | |
return_needed.append(f"{symbol} | {long_result} | {short_result} | {len(result['long'])} | {len(result['short'])} | {tp_rate}") | |
return "\n".join(return_needed) + "\n" | |
if __name__ == "__main__": | |
files = glob.glob("data/spot/*.csv") | |
## single core | |
# for file in files: | |
# symbol = file.split("/")[-1].split("-")[0] | |
# df = pd.read_csv(file) | |
# df = cal_wavetrend(df) | |
# # df.to_csv("BNBUSDT-1h-data.csv", index=False) | |
# # df = pd.read_csv("BNBUSDT-1h-data.csv") | |
# print(symbol, end=" | ") | |
# backtest(df) | |
# multicore | |
import os | |
from multiprocessing import Pool | |
with Pool(os.cpu_count() + 2) as p: | |
result = p.map(backtest, tqdm(files)) | |
with open("backtest.txt", "w") as f: | |
for r in result: | |
f.write(r) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment