Skip to content

Instantly share code, notes, and snippets.

@jweinst1
Last active April 11, 2026 21:50
Show Gist options
  • Select an option

  • Save jweinst1/f2d4e69cef84adf63511da932157e6ee to your computer and use it in GitHub Desktop.

Select an option

Save jweinst1/f2d4e69cef84adf63511da932157e6ee to your computer and use it in GitHub Desktop.
python script for multi column stock CSV and seeing duration of N day RSI zones
import csv
from typing import Dict, List, Optional
import math # only for sqrt in correlation (pure basics)
def is_float(num):
try:
float(num)
return True
except ValueError:
return False
def get_only_floats(arr):
return [float(s) for s in arr if is_float(s)]
def get_closing_prices(path_of_csv):
closing_sets = []
names = []
with open(path_of_csv, 'r', newline='') as f:
reader = csv.reader(f)
first = next(reader)
names = [s for s in first if s != '']
print(names)
for name in names:
closing_sets.append([])
next(reader, None) # discard hedger row
for row in reader:
prices = get_only_floats(row)
assert len(prices) == len(closing_sets)
for i in range(len(prices)):
closing_sets[i].append(prices[i])
return {"names":names, "closing":closing_sets}
def rsi_window(lst, start, end):
diff_pcts = []
for i in range(start + 1, end):
change = lst[i] - lst[i - 1]
diff_pcts.append(change / lst[i - 1])
gains = [max(0, a) for a in diff_pcts]
losses = [max(0, -a) for a in diff_pcts]
avg_gain = sum(gains) / len(gains)
avg_loss = sum(losses) / len(losses)
if avg_loss == 0:
return 100
rel_str = avg_gain / avg_loss
rsi_ind = 100 - (100 / (1 + rel_str))
return round(rsi_ind, 2)
def rsi_to_high_predict(lst, start, end, high=75.0, incr=0.5):
# foo[0:3] + [foo[2]]
cur_rsi = rsi_window(lst, start, end)
if cur_rsi >= high:
None
new_lst = lst[start:end+1] + [lst[end]]
while rsi_window(new_lst, 0, len(new_lst)) < high:
new_lst[len(new_lst)-1] += incr
return new_lst[len(new_lst)-1]
def rsi_to_low_predict(lst, start, end, low=30.0, incr=0.5):
# foo[0:3] + [foo[2]]
cur_rsi = rsi_window(lst, start, end)
if cur_rsi <= low:
None
new_lst = lst[start:end+1] + [lst[end]]
while rsi_window(new_lst, 0, len(new_lst)) > low:
new_lst[len(new_lst)-1] -= incr
return new_lst[len(new_lst)-1]
def next_close_for_rsi(prices, period=14, threshold=30.0, direction="above"):
"""
direction: "above" or "below"
"""
if direction not in ["above", "below"]:
raise ValueError("direction must be 'above' or 'below'")
if len(prices) < period + 1:
raise ValueError(f"Need at least {period + 1} prices")
# Build the last 'period' percentage changes
diff_pcts = []
start = len(prices) - period - 1
for i in range(start + 1, len(prices)):
change = prices[i] - prices[i - 1]
diff_pcts.append(change / prices[i - 1])
sum_gains = sum(max(0, a) for a in diff_pcts)
sum_losses = sum(max(0, -a) for a in diff_pcts)
oldest_pct = diff_pcts[0]
oldest_gain = max(0, oldest_pct)
oldest_loss = max(0, -oldest_pct)
base_gains = sum_gains - oldest_gain
base_losses = sum_losses - oldest_loss
current_price = prices[-1]
if base_losses == 0 and direction == "above":
return current_price
if base_gains == 0 and direction == "below":
return current_price
if direction == "above":
target_rs = threshold / (100.0 - threshold)
r = target_rs * base_losses - base_gains
if r >= 0:
return round(current_price * (1 + r), 2)
else:
return round(current_price, 2)
else: # below
target_rs = (100.0 - threshold) / threshold # inverted
r = base_gains - target_rs * base_losses # will be negative for a drop
if r <= 0:
return round(current_price * (1 + r), 2)
else:
return round(current_price, 2)
def add_rsis_to_closes(close_set, window_len=7):
close_set["rsis"] = []
for i in range(len(close_set["names"])):
name_rsis = []
start = 0
end = window_len
name_end = len(close_set["closing"][i])
while end < name_end:
name_rsis.append(rsi_window(close_set["closing"][i], start, end))
start += 1
end += 1
close_set["rsis"].append(name_rsis)
def get_zone_mark(val, low, high):
if val <= low:
return -1
elif val >= high:
return 1
else:
return 0
# Adds periods spent in each zone of buying
# consider multiple zones TODO
def add_rsi_revert_periods(close_set, low_bound=25.0, high_bound=75.0):
close_set["periods"] = []
for i in range(len(close_set["names"])):
zones = [get_zone_mark(x, low_bound, high_bound) for x in close_set["rsis"][i]]
close_set["periods"].append(zones)
if __name__ == "__main__":
my_closes = get_closing_prices("stock_closes.csv")
add_rsis_to_closes(my_closes, 7)
add_rsi_revert_periods(my_closes, low_bound=30.0, high_bound=70.0)
for elem in my_closes["periods"]:
print(elem)
my_prices = [10.0, 11.0, 12.0, 10.5, 10.1, 9.9, 9.5, 9.5, 9.8, 9.0]
print(rsi_window(my_prices, 3, len(my_prices)-1))
print(rsi_to_high_predict(my_prices, 3, len(my_prices)-1))
print(next_close_for_rsi(my_prices[2:len(my_prices)], 7, 70.0, "above"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment