Last active
April 11, 2026 21:50
-
-
Save jweinst1/f2d4e69cef84adf63511da932157e6ee to your computer and use it in GitHub Desktop.
python script for multi column stock CSV and seeing duration of N day RSI zones
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import csv | |
| from typing import Dict, List, Optional | |
| import math # only for sqrt in correlation (pure basics) | |
| def is_float(num): | |
| try: | |
| float(num) | |
| return True | |
| except ValueError: | |
| return False | |
| def get_only_floats(arr): | |
| return [float(s) for s in arr if is_float(s)] | |
| def get_closing_prices(path_of_csv): | |
| closing_sets = [] | |
| names = [] | |
| with open(path_of_csv, 'r', newline='') as f: | |
| reader = csv.reader(f) | |
| first = next(reader) | |
| names = [s for s in first if s != ''] | |
| print(names) | |
| for name in names: | |
| closing_sets.append([]) | |
| next(reader, None) # discard hedger row | |
| for row in reader: | |
| prices = get_only_floats(row) | |
| assert len(prices) == len(closing_sets) | |
| for i in range(len(prices)): | |
| closing_sets[i].append(prices[i]) | |
| return {"names":names, "closing":closing_sets} | |
| def rsi_window(lst, start, end): | |
| diff_pcts = [] | |
| for i in range(start + 1, end): | |
| change = lst[i] - lst[i - 1] | |
| diff_pcts.append(change / lst[i - 1]) | |
| gains = [max(0, a) for a in diff_pcts] | |
| losses = [max(0, -a) for a in diff_pcts] | |
| avg_gain = sum(gains) / len(gains) | |
| avg_loss = sum(losses) / len(losses) | |
| if avg_loss == 0: | |
| return 100 | |
| rel_str = avg_gain / avg_loss | |
| rsi_ind = 100 - (100 / (1 + rel_str)) | |
| return round(rsi_ind, 2) | |
| def rsi_to_high_predict(lst, start, end, high=75.0, incr=0.5): | |
| # foo[0:3] + [foo[2]] | |
| cur_rsi = rsi_window(lst, start, end) | |
| if cur_rsi >= high: | |
| None | |
| new_lst = lst[start:end+1] + [lst[end]] | |
| while rsi_window(new_lst, 0, len(new_lst)) < high: | |
| new_lst[len(new_lst)-1] += incr | |
| return new_lst[len(new_lst)-1] | |
| def rsi_to_low_predict(lst, start, end, low=30.0, incr=0.5): | |
| # foo[0:3] + [foo[2]] | |
| cur_rsi = rsi_window(lst, start, end) | |
| if cur_rsi <= low: | |
| None | |
| new_lst = lst[start:end+1] + [lst[end]] | |
| while rsi_window(new_lst, 0, len(new_lst)) > low: | |
| new_lst[len(new_lst)-1] -= incr | |
| return new_lst[len(new_lst)-1] | |
| def next_close_for_rsi(prices, period=14, threshold=30.0, direction="above"): | |
| """ | |
| direction: "above" or "below" | |
| """ | |
| if direction not in ["above", "below"]: | |
| raise ValueError("direction must be 'above' or 'below'") | |
| if len(prices) < period + 1: | |
| raise ValueError(f"Need at least {period + 1} prices") | |
| # Build the last 'period' percentage changes | |
| diff_pcts = [] | |
| start = len(prices) - period - 1 | |
| for i in range(start + 1, len(prices)): | |
| change = prices[i] - prices[i - 1] | |
| diff_pcts.append(change / prices[i - 1]) | |
| sum_gains = sum(max(0, a) for a in diff_pcts) | |
| sum_losses = sum(max(0, -a) for a in diff_pcts) | |
| oldest_pct = diff_pcts[0] | |
| oldest_gain = max(0, oldest_pct) | |
| oldest_loss = max(0, -oldest_pct) | |
| base_gains = sum_gains - oldest_gain | |
| base_losses = sum_losses - oldest_loss | |
| current_price = prices[-1] | |
| if base_losses == 0 and direction == "above": | |
| return current_price | |
| if base_gains == 0 and direction == "below": | |
| return current_price | |
| if direction == "above": | |
| target_rs = threshold / (100.0 - threshold) | |
| r = target_rs * base_losses - base_gains | |
| if r >= 0: | |
| return round(current_price * (1 + r), 2) | |
| else: | |
| return round(current_price, 2) | |
| else: # below | |
| target_rs = (100.0 - threshold) / threshold # inverted | |
| r = base_gains - target_rs * base_losses # will be negative for a drop | |
| if r <= 0: | |
| return round(current_price * (1 + r), 2) | |
| else: | |
| return round(current_price, 2) | |
| def add_rsis_to_closes(close_set, window_len=7): | |
| close_set["rsis"] = [] | |
| for i in range(len(close_set["names"])): | |
| name_rsis = [] | |
| start = 0 | |
| end = window_len | |
| name_end = len(close_set["closing"][i]) | |
| while end < name_end: | |
| name_rsis.append(rsi_window(close_set["closing"][i], start, end)) | |
| start += 1 | |
| end += 1 | |
| close_set["rsis"].append(name_rsis) | |
| def get_zone_mark(val, low, high): | |
| if val <= low: | |
| return -1 | |
| elif val >= high: | |
| return 1 | |
| else: | |
| return 0 | |
| # Adds periods spent in each zone of buying | |
| # consider multiple zones TODO | |
| def add_rsi_revert_periods(close_set, low_bound=25.0, high_bound=75.0): | |
| close_set["periods"] = [] | |
| for i in range(len(close_set["names"])): | |
| zones = [get_zone_mark(x, low_bound, high_bound) for x in close_set["rsis"][i]] | |
| close_set["periods"].append(zones) | |
| if __name__ == "__main__": | |
| my_closes = get_closing_prices("stock_closes.csv") | |
| add_rsis_to_closes(my_closes, 7) | |
| add_rsi_revert_periods(my_closes, low_bound=30.0, high_bound=70.0) | |
| for elem in my_closes["periods"]: | |
| print(elem) | |
| my_prices = [10.0, 11.0, 12.0, 10.5, 10.1, 9.9, 9.5, 9.5, 9.8, 9.0] | |
| print(rsi_window(my_prices, 3, len(my_prices)-1)) | |
| print(rsi_to_high_predict(my_prices, 3, len(my_prices)-1)) | |
| print(next_close_for_rsi(my_prices[2:len(my_prices)], 7, 70.0, "above")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment