Last active
April 11, 2025 15:49
-
-
Save FlyingFathead/e6647c743373874ff69384bdedd2ff08 to your computer and use it in GitHub Desktop.
VIX (CBOE Volatility Index) ticker-tracker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# vix_ticker.py | |
# | |
# A simple Python program to fetch the Chicago Board Options Exchange's | |
# CBOE Volatility Index, also known as the VIX index. | |
# | |
# The purpose of this small program is to trigger warnings via Yahoo Finance | |
# in case of market panic or other volatility; useful knowledge for many. | |
# | |
# By FlyingFathead (https://github.com/FlyingFathead) | |
# (Apr 11 2025) | |
################################################################################ | |
# CONFIG / CONSTANTS | |
################################################################################ | |
TTS_SYSTEM_TEST = True # <-- Set True or False as desired | |
DEFAULT_POLLING_INTERVAL = 30 # seconds; e.g., 300 seconds = 5 minutes | |
############################################################################### | |
# Custom Logging with Local Timezone | |
############################################################################### | |
import logging | |
import time | |
import warnings | |
import math | |
import argparse | |
import datetime | |
# tz formatter | |
def get_tz_abbreviation(dt): | |
tz_name = dt.astimezone().tzname() | |
# Customize the mapping as needed. | |
mapping = { | |
"FLE Daylight Time": "EEST", | |
"FLE Standard Time": "EET", | |
} | |
return mapping.get(tz_name, tz_name) | |
# Custom formatter that adds local timezone name | |
class TZFormatter(logging.Formatter): | |
def formatTime(self, record, datefmt=None): | |
# Convert the record's timestamp (in seconds) to a datetime object | |
ct = datetime.datetime.fromtimestamp(record.created) | |
# Format the time as per datefmt or default if not provided | |
if datefmt: | |
s = ct.strftime(datefmt) | |
else: | |
s = ct.strftime("%Y-%m-%d %H:%M:%S") | |
# Get the local timezone abbreviation with custom mapping | |
tz_abbr = get_tz_abbreviation(datetime.datetime.now()) | |
return f"{s} {tz_abbr}" | |
# Configure logging: Create handlers and set the custom TZFormatter | |
formatter = TZFormatter("%(asctime)s - %(levelname)s - %(message)s") | |
stream_handler = logging.StreamHandler() | |
stream_handler.setFormatter(formatter) | |
file_handler = logging.FileHandler("vix_spike_watch.log") | |
file_handler.setFormatter(formatter) | |
logging.basicConfig( | |
level=logging.INFO, | |
handlers=[file_handler, stream_handler] | |
) | |
################################################################################ | |
# INITIAL STARTUP / INSTALL MISSING PIP PACKAGES IF NEEDED | |
################################################################################ | |
import importlib | |
import importlib.util | |
import subprocess | |
import sys | |
# List of required packages (only external ones) | |
required_packages = [ | |
"yfinance", | |
"pyttsx3" | |
] | |
# Function to install missing packages | |
def ensure_packages_installed(packages): | |
""" | |
For each package in the list, attempt to import it. | |
If it's not installed, install it via pip, then reload import. | |
""" | |
for package_name in packages: | |
if not is_package_installed(package_name): | |
logging.info(f"Package '{package_name}' not found. Installing now...") | |
install_package(package_name) | |
# After installing, try to import again to confirm it works | |
if not is_package_installed(package_name): | |
logging.error(f"Failed to import '{package_name}' even after installation.") | |
sys.exit(1) | |
else: | |
logging.info(f"Package '{package_name}' is already installed.") | |
def is_package_installed(package_name): | |
"""Return True if the given package can be imported, False otherwise.""" | |
spec = importlib.util.find_spec(package_name) | |
return (spec is not None) | |
def install_package(package_name): | |
"""Install the given package via pip using subprocess.check_call.""" | |
try: | |
subprocess.check_call([sys.executable, "-m", "pip", "install", package_name]) | |
# Invalidate import caches so Python sees the newly installed package | |
importlib.invalidate_caches() | |
except subprocess.CalledProcessError as e: | |
logging.error(f"Installation of package '{package_name}' failed: {e}") | |
sys.exit(1) | |
# Actually ensure the packages are installed | |
ensure_packages_installed(required_packages) | |
# Now we can safely import the libraries | |
import yfinance as yf | |
import pyttsx3 | |
warnings.simplefilter(action='ignore', category=FutureWarning) | |
################################################################################ | |
# CMDLINE ARGS | |
################################################################################ | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--interval", type=int, default=DEFAULT_POLLING_INTERVAL, | |
help="Polling interval in seconds") | |
args = parser.parse_args() | |
POLLING_INTERVAL = args.interval | |
################################################################################ | |
# TTS | |
################################################################################ | |
SPOKEN_NAME = { | |
"^VXIND1": "the VIX near-term futures", | |
"^VIX": "the spot VIX index", | |
"^DJI": "the Dow Jones index", | |
} | |
def speak(text): | |
"""Use pyttsx3 to say the text out loud, with error handling.""" | |
try: | |
engine = pyttsx3.init() | |
engine.say(text) | |
engine.runAndWait() | |
engine.stop() | |
except Exception as e: | |
logging.error(f"Text-to-speech error: {e}") | |
################################################################################ | |
# Alert Functions | |
################################################################################ | |
def run_custom_script(symbol, threshold_label, current_value, threshold): | |
""" | |
This is called when a threshold is crossed. | |
We'll make a dynamic TTS message: | |
- The 'threshold_label' (e.g. "10% Spike") | |
- The actual percentage difference from the baseline | |
- The current price | |
- The baseline | |
""" | |
logging.info( | |
f"[run_custom_script] {symbol}: {threshold_label} threshold reached. " | |
f"Current: {current_value:.2f}, Threshold: {threshold:.2f}" | |
) | |
# Calculate how far from the baseline (in actual %), | |
# rather than just the nominal threshold. | |
baseline = baseline_dict[symbol] | |
actual_spike_pct = (current_value / baseline - 1.0) * 100.0 | |
# Pull a friendly name | |
friendly_symbol = SPOKEN_NAME.get(symbol, symbol) | |
# Basic start of message | |
msg = ( | |
f"Attention! {friendly_symbol} triggered {threshold_label}. " | |
f"Baseline was {baseline:.2f}, and it's now {current_value:.2f}, " | |
f"a total spike of {actual_spike_pct:.2f} percent from baseline." | |
) | |
# If the real spike is truly huge, ramp up the drama | |
if actual_spike_pct >= 90: | |
msg += " DROP THE BOMB!!! EXTERMINATE THEM ALL!!!" | |
elif actual_spike_pct >= 80: | |
msg += " WE'RE DOOMED!!! THIS IS THE END!!! BYE GUYS!!!" | |
elif actual_spike_pct >= 60: | |
msg += " YAHOO-HO-HO-HOO!!! LOOKS LIKE WE'RE GONE AND NEVER COMING BACK!!!" | |
elif actual_spike_pct >= 50: | |
msg += " Oh, wow! Looks like we're pretty much screwed! Everyone, run for the hills!" | |
elif actual_spike_pct >= 40: | |
msg += " System meltdown is imminent! The index is now in panic territory!" | |
elif actual_spike_pct >= 30: | |
msg += " This is a red alert scenario! Conditions are extremely volatile!" | |
elif actual_spike_pct >= 20: | |
msg += " Warning: volatility is approaching meltdown levels!" | |
# log the msg | |
logging.info(f"[TTS Message] {msg}") | |
speak(msg) | |
################################################################################ | |
# Setup | |
################################################################################ | |
# tickers = ["^VXIND1", "^VIX"] | |
tickers = ["^VIX"] | |
baseline_dict = {} | |
previous_values = {} | |
# --- NEW: Determine yfinance interval string --- | |
def get_yf_interval(seconds): | |
if seconds < 60: | |
logging.warning(f"Polling interval {seconds}s is less than 1 minute. Using '1m' for yfinance data.") | |
return "1m" | |
elif seconds == 60: | |
return "1m" | |
elif seconds == 120: | |
return "2m" | |
elif seconds == 300: | |
return "5m" | |
elif seconds == 900: | |
return "15m" | |
elif seconds == 1800: | |
return "30m" | |
elif seconds == 3600: | |
return "60m" # or "1h" | |
elif seconds == 5400: | |
return "90m" | |
else: | |
# Default or fallback for unsupported intervals - adjust as needed | |
logging.warning(f"Polling interval {seconds}s does not map directly to a standard yfinance interval < 1 day. Using '1m'. Adjust code if needed.") | |
return "1m" # Or raise an error, or choose another default like '5m' | |
YF_INTERVAL_STRING = get_yf_interval(POLLING_INTERVAL) | |
# Display the chosen interval | |
logging.info(f"Using polling interval: {POLLING_INTERVAL}s") | |
logging.info(f"Using yfinance data interval: {YF_INTERVAL_STRING}") | |
def get_baseline(ticker): | |
""" | |
Fetch history from yfinance and return the *previous trading day's* | |
close as the baseline. | |
""" | |
t = yf.Ticker(ticker) | |
# Fetch slightly more history just in case, e.g., 7 days, | |
# to be robust against holidays etc. | |
info = t.history(period="7d") # Increased period slightly for robustness | |
# Check if we have at least two days of data | |
# (needed for current/last day and previous day) | |
if info is None or len(info) < 2: | |
logging.critical( | |
f"Insufficient historical data for {ticker} (need at least 2 days). " | |
f"Cannot determine previous day's close. Exiting." | |
) | |
return None | |
# The baseline is the close of the second-to-last day in the series | |
try: | |
prev_day_close = info["Close"].iloc[-2] | |
prev_day_date = info.index[-2].strftime("%Y-%m-%d") # Just the date is enough | |
# Check if the retrieved close is valid | |
if math.isnan(prev_day_close): | |
logging.critical( | |
f"Previous day's close for {ticker} ({prev_day_date}) is NaN. " | |
f"Cannot set baseline. Exiting." | |
) | |
return None | |
logging.info( | |
f"Fetched baseline for {ticker}: {prev_day_close:.2f} " | |
f"(from previous trading day's close: {prev_day_date})" | |
) | |
return prev_day_close | |
except IndexError: | |
logging.critical( | |
f"IndexError accessing iloc[-2] for {ticker}. " | |
f"DataFrame shape: {info.shape}. Cannot set baseline. Exiting." | |
) | |
return None | |
except Exception as e: | |
logging.critical( | |
f"Unexpected error getting baseline for {ticker}: {e}. Exiting." | |
) | |
return None | |
# Fetch baselines & set previous values | |
for sym in tickers: | |
baseline = get_baseline(sym) | |
if baseline is None: | |
exit(1) | |
baseline_dict[sym] = baseline | |
previous_values[sym] = baseline # seed the fallback with baseline | |
# Define thresholds | |
alert_levels = { | |
# "^VXIND1": [ | |
# {"name": "5% Spike", "threshold": baseline_dict["^VXIND1"] * 1.05, "action": run_custom_script, "enabled": True}, | |
# {"name": "10% Spike", "threshold": baseline_dict["^VXIND1"] * 1.10, "action": run_custom_script, "enabled": True}, | |
# {"name": "20% Spike", "threshold": baseline_dict["^VXIND1"] * 1.20, "action": run_custom_script, "enabled": True}, | |
# ], | |
"^VIX": [ | |
{"name": "5% Spike", "threshold": baseline_dict["^VIX"] * 1.05, "action": run_custom_script, "enabled": True}, | |
{"name": "10% Spike", "threshold": baseline_dict["^VIX"] * 1.10, "action": run_custom_script, "enabled": True}, | |
{"name": "15% Spike", "threshold": baseline_dict["^VIX"] * 1.15, "action": run_custom_script, "enabled": True}, | |
{"name": "20% Spike", "threshold": baseline_dict["^VIX"] * 1.20, "action": run_custom_script, "enabled": True}, | |
{"name": "25% Spike", "threshold": baseline_dict["^VIX"] * 1.25, "action": run_custom_script, "enabled": True}, | |
{"name": "30% Spike", "threshold": baseline_dict["^VIX"] * 1.30, "action": run_custom_script, "enabled": True}, | |
{"name": "40% Spike", "threshold": baseline_dict["^VIX"] * 1.40, "action": run_custom_script, "enabled": True}, | |
{"name": "50% Spike", "threshold": baseline_dict["^VIX"] * 1.50, "action": run_custom_script, "enabled": True}, | |
] | |
} | |
# Track triggered alerts so we don’t spam repeatedly | |
triggered_alerts = { "^VXIND1": set(), "^VIX": set() } | |
logging.info("Starting VIX watchers...") | |
for sym in tickers: | |
logging.info(f"Baseline for {sym}: {baseline_dict[sym]:.2f}") | |
for lvl in alert_levels[sym]: | |
if lvl["enabled"]: | |
logging.info(f" {sym} -> {lvl['name']}: {lvl['threshold']:.2f}") | |
# 1) Run a short TTS test at script startup (only if TTS_SYSTEM_TEST = True) | |
if TTS_SYSTEM_TEST: | |
speak("Volatility index monitoring started.") | |
################################################################################ | |
# Main Watcher Loop | |
################################################################################ | |
while True: | |
try: | |
# data = yf.download(tickers, period="1d", interval="5m", progress=False) | |
data = yf.download(tickers, period="1d", interval=YF_INTERVAL_STRING, progress=False) | |
if data.empty: | |
logging.warning(f"No data returned for tickers using interval {YF_INTERVAL_STRING}. Retrying...") | |
else: | |
for sym in tickers: | |
# Make sure we have 'Close' data for the symbol | |
if ("Close" not in data.columns) or (sym not in data["Close"].columns): | |
logging.warning(f"No 'Close' data found for {sym}. Skipping.") | |
continue | |
current_vix = data["Close"][sym].iloc[-1] | |
if math.isnan(current_vix): | |
# fallback if no new data | |
if not math.isnan(previous_values[sym]): | |
logging.warning( | |
f"{sym}: No new data in last fetch (interval: {YF_INTERVAL_STRING}). Using fallback: {previous_values[sym]:.2f}" | |
) | |
current_vix = previous_values[sym] | |
else: | |
logging.warning(f"{sym}: No data & no prior fallback. Skipping.") | |
continue | |
current_time = data.index[-1].strftime("%Y-%m-%d %H:%M:%S") | |
prev_value = previous_values[sym] | |
difference = current_vix - prev_value | |
pct_change = (difference / prev_value * 100) if prev_value != 0 else 0 | |
arrow = "➡️" | |
if difference > 0: | |
arrow = "🔺" | |
elif difference < 0: | |
arrow = "🔻" | |
logging.info( | |
f"{sym}: {current_vix:.2f} {arrow} " | |
f"({difference:+.2f} pts, {pct_change:+.2f}%) " | |
f"(data timestamp [UTC]: {current_time})" | |
) | |
# store the new value as previous for next iteration | |
previous_values[sym] = current_vix | |
# Check thresholds | |
for lvl in alert_levels[sym]: | |
if not lvl["enabled"]: | |
continue | |
if current_vix >= lvl["threshold"] and (lvl["name"] not in triggered_alerts[sym]): | |
logging.warning( | |
f"🚨 {sym} spike detected: {lvl['name']} " | |
f"(≥ {lvl['threshold']:.2f})!" | |
) | |
lvl["action"](sym, lvl["name"], current_vix, lvl["threshold"]) | |
triggered_alerts[sym].add(lvl["name"]) | |
time.sleep(POLLING_INTERVAL) | |
except Exception as e: | |
logging.error(f"Error: {e}") | |
time.sleep(POLLING_INTERVAL) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment