Skip to content

Instantly share code, notes, and snippets.

@FlyingFathead
Last active March 27, 2026 21:54
Show Gist options
  • Select an option

  • Save FlyingFathead/e6647c743373874ff69384bdedd2ff08 to your computer and use it in GitHub Desktop.

Select an option

Save FlyingFathead/e6647c743373874ff69384bdedd2ff08 to your computer and use it in GitHub Desktop.
VIX (CBOE Volatility Index) ticker-tracker
#!/usr/bin/env python3
#
# vix_ticker.py
#
# A simple Python program to monitor the Chicago Board Options Exchange's
# CBOE Volatility Index (^VIX) via Yahoo Finance.
#
# The purpose of this small program is to trigger warnings via Yahoo Finance
# in case of market panic or other volatility; useful knowledge for many.
#
# By FlyingFathead (https://github.com/FlyingFathead)
# (Initial version created Apr 11 2025 / Updated Mar 27 2026)
#
# Single-file gist version:
# - config block at top
# - versioned startup banner
# - prompted dependency installation (no blind auto-install)
# - local-timezone logging
# - previous-close baseline thresholds
# - optional TTS alerts
# - duplicate candle suppression for sub-minute polling on 1m data
################################################################################
# CONFIG / CONSTANTS
################################################################################
APP_NAME = "😰😰😰 VIX Ticker-Tracker 😰😰😰"
APP_VERSION = "0.4.32"
APP_AUTHOR = "FlyingFathead"
APP_URL = "https://github.com/FlyingFathead"
LOG_FILE = "vix_spike_watch.log"
DEFAULT_POLLING_INTERVAL = 30 # seconds
TTS_SYSTEM_TEST = True
ENABLE_TTS = True
SUPPRESS_DUPLICATE_TIMESTAMPS = True
DEFAULT_TICKERS = ["^VIX"]
# External packages required by this script.
# Key = import name, value = pip package name.
REQUIRED_PACKAGES = {
"yfinance": "yfinance",
"pyttsx3": "pyttsx3",
"packaging": "packaging",
}
# Spike thresholds, based on previous trading day's close.
SPIKE_THRESHOLDS_PCT = [5, 10, 15, 20, 25, 30, 40, 50]
# baseline rollover behavior
BASELINE_HISTORY_PERIOD = "14d"
LOG_BASELINE_ROLLOVERS = True
SPOKEN_NAME = {
"^VXIND1": "the VIX near-term futures",
"^VIX": "the spot VIX index",
"^DJI": "the Dow Jones index",
}
# VIX direction markers:
# up = worse, down = better, flat = neutral
VIX_UP_MARKER = "🔺"
VIX_DOWN_MARKER = "▽"
VIX_FLAT_MARKER = "▷"
# split analysis into two lines, true/false
SPLIT_ANALYSIS_LINES = True
# alert reset / re-arm behavior
ALERT_RESET_HYSTERESIS_PCT = 0.5 # re-arm once value drops 0.5% below threshold
LOG_ALERT_RESETS = True
# historical VIX record levels used for record-break alerts
# these are close-based records; change them if you prefer intraday records
ANNOUNCE_RECORD_BREAKS = True
VIX_RECORD_HIGH = 82.69
VIX_RECORD_HIGH_DATE = "2020-03-16"
VIX_RECORD_LOW = 9.14
VIX_RECORD_LOW_DATE = "2017-11-03"
# absolute VIX level alerts (not relative to previous close)
ANNOUNCE_ABSOLUTE_VIX_LEVELS = True
ABSOLUTE_VIX_ALERT_LEVELS = [20.0, 25.0, 30.0, 35.0, 40.0, 50.0]
# startup-only catch-up behavior for absolute VIX levels:
# if the script starts while VIX is already above multiple levels, only announce
# the highest crossed level once, and silently mark the lower crossed levels as
# already seen for this run.
STARTUP_ONLY_HIGHEST_ABSOLUTE_VIX_LEVEL = True
# startup-only catch-up behavior for percentage spike thresholds:
# if the script starts while the symbol is already above multiple percentage
# spike thresholds, only announce the highest crossed threshold once, and
# silently mark lower crossed thresholds as already seen for this run.
STARTUP_ONLY_HIGHEST_SPIKE_THRESHOLD = True
# intraday/session extreme tracking
LOG_INTRADAY_EXTREMES = True
# for version checking
MIN_TESTED_YFINANCE = "1.2.0"
CHECK_UPDATES_ON_STARTUP = False
PYPI_CHECK_TIMEOUT = 3.0
# for yfinance rate limits
RATE_LIMIT_BACKOFF_INITIAL = 60 # seconds
RATE_LIMIT_BACKOFF_MAX = 900 # 15 min
rate_limit_backoff_seconds = RATE_LIMIT_BACKOFF_INITIAL
# quiet heartbeat when polling continues but no new candle arrives
HEARTBEAT_WHEN_IDLE = True
HEARTBEAT_INTERVAL_SECONDS = 300 # 5 min
################################################################################
# STDLIB IMPORTS
################################################################################
import argparse
import atexit
import datetime
import importlib
import importlib.util
import logging
import math
import queue
import shlex
import shutil
import subprocess
import sys
import threading
import time
import warnings
from typing import Dict, List, Optional, Tuple
################################################################################
# BANNER / DISPLAY
################################################################################
def print_banner() -> None:
width = shutil.get_terminal_size(fallback=(79, 24)).columns
line = "-" * width
title = f"::: {APP_NAME} v{APP_VERSION}"
byline = f"::: by {APP_AUTHOR} | {APP_URL}"
print(line)
print(title)
print(byline)
print(line)
################################################################################
# OPTIONAL VERSION / UPDATE CHECKS
################################################################################
import json
import urllib.request
from importlib.metadata import PackageNotFoundError, version as dist_version
try:
from packaging.version import Version, InvalidVersion
except Exception:
Version = None
InvalidVersion = Exception
def get_installed_version(dist_name: str) -> Optional[str]:
try:
return dist_version(dist_name)
except PackageNotFoundError:
return None
except Exception:
return None
def version_lt(installed: str, minimum: str) -> bool:
"""
Proper version comparison when packaging is available.
Conservative fallback otherwise.
"""
if Version is not None:
try:
return Version(installed) < Version(minimum)
except InvalidVersion:
pass
# fallback: if we can't compare properly, only warn on non-match
return installed != minimum
def log_runtime_versions() -> None:
py_ver = sys.version.split()[0]
yf_ver = get_installed_version("yfinance") or "unknown"
tts_ver = get_installed_version("pyttsx3") or "unknown"
logging.info(
f"Runtime versions: Python {py_ver} | yfinance {yf_ver} | pyttsx3 {tts_ver}"
)
if yf_ver != "unknown" and version_lt(yf_ver, MIN_TESTED_YFINANCE):
logging.warning(
f"yfinance {yf_ver} is older than the minimum tested version "
f"{MIN_TESTED_YFINANCE}. Update recommended:\n"
f" {sys.executable} -m pip install -U yfinance"
)
def get_latest_pypi_version(project: str, timeout: float = PYPI_CHECK_TIMEOUT) -> Optional[str]:
url = f"https://pypi.org/pypi/{project}/json"
req = urllib.request.Request(
url,
headers={"User-Agent": f"{APP_NAME}/{APP_VERSION}"}
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
payload = json.load(resp)
return payload.get("info", {}).get("version")
except Exception as exc:
logging.warning(f"Could not check PyPI for {project}: {exc}")
return None
def check_for_updates() -> None:
installed = get_installed_version("yfinance")
if not installed:
return
latest = get_latest_pypi_version("yfinance")
if not latest:
return
if Version is not None:
try:
if Version(installed) < Version(latest):
logging.warning(
f"Update available for yfinance: installed {installed}, latest {latest}. "
f"Upgrade with:\n {sys.executable} -m pip install -U yfinance"
)
else:
logging.info(f"yfinance is up to date: {installed}")
return
except InvalidVersion:
pass
if installed != latest:
logging.warning(
f"Possible yfinance update available: installed {installed}, latest {latest}"
)
else:
logging.info(f"yfinance is up to date: {installed}")
################################################################################
# LOGGING WITH LOCAL TIMEZONE
################################################################################
def get_tz_abbreviation(dt: datetime.datetime) -> str:
tz_name = dt.astimezone().tzname()
mapping = {
"FLE Daylight Time": "EEST",
"FLE Standard Time": "EET",
}
return mapping.get(tz_name, tz_name or "LOCAL")
class TZFormatter(logging.Formatter):
def formatTime(self, record, datefmt=None):
ct = datetime.datetime.fromtimestamp(record.created).astimezone()
if datefmt:
s = ct.strftime(datefmt)
else:
s = ct.strftime("%Y-%m-%d %H:%M:%S")
tz_abbr = get_tz_abbreviation(ct)
return f"{s} {tz_abbr}"
def setup_logging() -> None:
formatter = TZFormatter("%(asctime)s - %(levelname)s - %(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
file_handler = logging.FileHandler(LOG_FILE, encoding="utf-8")
file_handler.setFormatter(formatter)
root = logging.getLogger()
root.setLevel(logging.INFO)
root.handlers.clear()
root.addHandler(stream_handler)
root.addHandler(file_handler)
################################################################################
# DEPENDENCY CHECK / PROMPTED INSTALL
################################################################################
def is_package_installed(import_name: str) -> bool:
return importlib.util.find_spec(import_name) is not None
def find_missing_packages(required_packages: Dict[str, str]) -> List[Tuple[str, str]]:
missing = []
for import_name, pip_name in required_packages.items():
if not is_package_installed(import_name):
missing.append((import_name, pip_name))
return missing
def prompt_install_missing_packages(missing_packages: List[Tuple[str, str]]) -> None:
if not missing_packages:
return
print()
print("Missing required packages detected:")
for import_name, pip_name in missing_packages:
if import_name == pip_name:
print(f" - {pip_name}")
else:
print(f" - {pip_name} (import: {import_name})")
print()
while True:
response = input(
"Install these now? [y/N] "
"(enter 'n' to abort the whole program): "
).strip().lower()
if response in ("", "n", "no"):
print("Aborting at user request.")
raise SystemExit(1)
if response in ("y", "yes"):
break
print("Please answer y or n.")
default_prefix = f'"{sys.executable}" -m pip install'
install_prefix = input(
f"Preferred install command prefix\n"
f"[default: {default_prefix}]\n> "
).strip()
if not install_prefix:
install_prefix = default_prefix
cmd = shlex.split(install_prefix) + [pip_name for _, pip_name in missing_packages]
print()
print("Running install command:")
print(" " + " ".join(cmd))
print()
try:
subprocess.check_call(cmd)
importlib.invalidate_caches()
except subprocess.CalledProcessError as exc:
print(f"Dependency install failed: {exc}")
raise SystemExit(1)
still_missing = find_missing_packages(REQUIRED_PACKAGES)
if still_missing:
print("Some packages are still missing after installation:")
for import_name, pip_name in still_missing:
print(f" - {pip_name} (import: {import_name})")
raise SystemExit(1)
################################################################################
# DEPENDENCY BOOTSTRAP
################################################################################
print_banner()
setup_logging()
missing_packages = find_missing_packages(REQUIRED_PACKAGES)
prompt_install_missing_packages(missing_packages)
import yfinance as yf # noqa: E402
import pyttsx3 # noqa: E402
from yfinance.exceptions import YFRateLimitError # noqa: E402
warnings.simplefilter(action="ignore", category=FutureWarning)
################################################################################
# CMDLINE ARGS
################################################################################
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Monitor VIX spikes from previous close.")
parser.add_argument(
"--interval",
type=int,
default=DEFAULT_POLLING_INTERVAL,
help="Polling interval in seconds",
)
parser.add_argument(
"--no-tts",
action="store_true",
help="Disable text-to-speech even if ENABLE_TTS is True",
)
parser.add_argument(
"--check-updates",
action="store_true",
help="Check PyPI for newer package versions on startup",
)
return parser.parse_args()
args = parse_args()
POLLING_INTERVAL = args.interval
TTS_ENABLED = ENABLE_TTS and not args.no_tts
log_runtime_versions()
if CHECK_UPDATES_ON_STARTUP or args.check_updates:
check_for_updates()
################################################################################
# TTS
################################################################################
TTS_QUEUE_MAXSIZE = 32
TTS_DROP_WHEN_BUSY = True
TTS_DEDUP_CONSECUTIVE = True
_tts_queue = queue.Queue(maxsize=TTS_QUEUE_MAXSIZE)
_tts_thread = None
_tts_last_text = None
_tts_worker_failed = False
def _tts_worker() -> None:
global _tts_worker_failed
try:
engine = pyttsx3.init()
except Exception as exc:
_tts_worker_failed = True
logging.error(f"TTS init error: {exc}")
return
while True:
text = _tts_queue.get()
try:
if text is None:
return
engine.say(text)
engine.runAndWait()
except Exception as exc:
logging.error(f"TTS worker error: {exc}")
finally:
_tts_queue.task_done()
def ensure_tts_worker() -> None:
global _tts_thread
if _tts_worker_failed:
return
if _tts_thread is None or not _tts_thread.is_alive():
_tts_thread = threading.Thread(
target=_tts_worker,
name="tts-worker",
daemon=True,
)
_tts_thread.start()
def stop_tts_worker() -> None:
if _tts_thread is not None and _tts_thread.is_alive():
try:
_tts_queue.put_nowait(None)
except queue.Full:
pass
atexit.register(stop_tts_worker)
def speak(text: str) -> None:
global _tts_last_text
if not TTS_ENABLED:
return
ensure_tts_worker()
if _tts_worker_failed:
return
if TTS_DEDUP_CONSECUTIVE and text == _tts_last_text:
return
try:
if TTS_DROP_WHEN_BUSY:
_tts_queue.put_nowait(text)
else:
_tts_queue.put(text)
_tts_last_text = text
except queue.Full:
logging.warning("TTS queue full; dropping speech message.")
################################################################################
# HELPERS
################################################################################
def safe_pct_from_baseline(current_value: float, baseline: float, sym: str, label: str) -> float:
if baseline > 0:
return ((current_value / baseline) - 1.0) * 100.0
logging.warning(
f"{sym}: baseline is non-positive ({baseline}); forcing {label} to 0.0"
)
return 0.0
def reset_rate_limit_backoff() -> None:
global rate_limit_backoff_seconds
rate_limit_backoff_seconds = RATE_LIMIT_BACKOFF_INITIAL
def increase_rate_limit_backoff() -> int:
global rate_limit_backoff_seconds
wait_s = rate_limit_backoff_seconds
rate_limit_backoff_seconds = min(rate_limit_backoff_seconds * 2, RATE_LIMIT_BACKOFF_MAX)
return wait_s
def get_yf_interval(seconds: int) -> str:
"""
Map polling interval in seconds to a yfinance interval string.
If the polling interval is below 60s, yfinance still only gives 1m data,
so we clamp to '1m' and log a warning.
"""
if seconds < 60:
logging.warning(
f"Polling interval {seconds}s is less than 1 minute. Using '1m' for yfinance data."
)
return "1m"
if seconds == 60:
return "1m"
if seconds == 120:
return "2m"
if seconds == 300:
return "5m"
if seconds == 900:
return "15m"
if seconds == 1800:
return "30m"
if seconds == 3600:
return "60m"
if seconds == 5400:
return "90m"
logging.warning(
f"Polling interval {seconds}s does not map directly to a standard "
f"yfinance intraday interval. Using '1m'."
)
return "1m"
def format_dt_index_value(ts) -> str:
"""
Convert a pandas/DatetimeIndex timestamp to a clean UTC string if possible.
Falls back to str(ts) if needed.
"""
if hasattr(ts, "to_pydatetime"):
dt = ts.to_pydatetime()
if dt.tzinfo is None:
return dt.strftime("%Y-%m-%d %H:%M:%S")
return dt.astimezone(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
return str(ts)
def get_latest_close_and_timestamp(data, sym: str) -> Tuple[Optional[float], Optional[str]]:
"""
Return the latest close value and timestamp for a symbol from a yfinance
download dataframe.
Supports both:
- MultiIndex columns: ('Close', '^VIX')
- Single-level columns: 'Close'
"""
try:
if getattr(data.columns, "nlevels", 1) > 1:
if ("Close", sym) not in data.columns:
return None, None
series = data[("Close", sym)].dropna()
else:
if "Close" not in data.columns:
return None, None
series = data["Close"].dropna()
if series.empty:
return None, None
value = float(series.iloc[-1])
timestamp = format_dt_index_value(series.index[-1])
return value, timestamp
except Exception:
return None, None
def get_reference_session_date(current_timestamp: Optional[str] = None) -> datetime.date:
"""
Return the current session/reference date as a UTC date.
For U.S. market intraday timestamps in this script, using the UTC date is
sufficient because the trading session does not cross UTC midnight.
"""
if current_timestamp:
try:
dt = datetime.datetime.strptime(current_timestamp, "%Y-%m-%d %H:%M:%S")
return dt.date()
except Exception:
pass
return datetime.datetime.now(datetime.timezone.utc).date()
def get_previous_trading_close_info(
ticker: str,
reference_date: datetime.date,
) -> Tuple[Optional[float], Optional[str]]:
"""
Return the last completed daily close strictly BEFORE the given reference_date.
"""
try:
t = yf.Ticker(ticker)
info = t.history(period=BASELINE_HISTORY_PERIOD, interval="1d", auto_adjust=False)
except YFRateLimitError:
logging.warning(
f"{ticker}: Yahoo rate-limited baseline fetch. Will retry later."
)
return None, None
except Exception as exc:
logging.error(
f"{ticker}: error while fetching previous trading close: {exc}"
)
return None, None
if info is None or info.empty or "Close" not in info.columns:
logging.warning(
f"No usable daily history returned for {ticker}. Cannot determine previous close yet."
)
return None, None
closes = info["Close"].dropna()
if closes.empty:
logging.warning(
f"Daily history for {ticker} contains no usable close values. Cannot determine previous close yet."
)
return None, None
try:
eligible = closes[closes.index.date < reference_date]
if eligible.empty:
logging.warning(
f"No completed daily close earlier than {reference_date.isoformat()} "
f"for {ticker}. Cannot determine previous close yet."
)
return None, None
close_value = float(eligible.iloc[-1])
close_date = eligible.index[-1].strftime("%Y-%m-%d")
if math.isnan(close_value):
logging.warning(
f"Previous trading close for {ticker} ({close_date}) is NaN. "
f"Cannot determine baseline yet."
)
return None, None
return close_value, close_date
except Exception as exc:
logging.error(
f"Unexpected error getting previous trading close for {ticker}: {exc}"
)
return None, None
def build_alert_levels_for_symbol(sym: str, baseline: float) -> list:
"""
Build threshold definitions for a single symbol from a baseline.
"""
levels = []
for pct in SPIKE_THRESHOLDS_PCT:
levels.append(
{
"name": f"{pct}% Spike",
"pct": pct,
"threshold": baseline * (1.0 + pct / 100.0),
"action": run_custom_script,
"enabled": True,
}
)
return levels
def log_thresholds_for_symbol(sym: str, reference_date: datetime.date) -> None:
"""
Log current baseline and thresholds for one symbol, anchored to the session date.
"""
logging.info(
f"Baseline for {sym}: {baseline_dict[sym]:.2f} "
f"(previous close date: {baseline_date_dict[sym]}; "
f"session date: {reference_date.isoformat()})"
)
for lvl in alert_levels[sym]:
if lvl["enabled"]:
logging.info(f" {sym} -> {lvl['name']}: {lvl['threshold']:.2f}")
def refresh_baseline_for_symbol(
sym: str,
reference_date: datetime.date,
force: bool = False,
) -> bool:
"""
Refresh the symbol baseline from the previous trading close relative to
the given reference_date.
Returns True if the baseline changed, False otherwise.
On change:
- baseline_dict[sym] is updated
- baseline_date_dict[sym] is updated
- alert thresholds are rebuilt
- triggered alerts are cleared/re-armed
"""
new_baseline, new_baseline_date = get_previous_trading_close_info(sym, reference_date)
if new_baseline is None or new_baseline_date is None:
return False
old_baseline = baseline_dict.get(sym)
old_baseline_date = baseline_date_dict.get(sym)
changed = (
force
or old_baseline is None
or old_baseline_date != new_baseline_date
or not math.isclose(old_baseline, new_baseline, rel_tol=0.0, abs_tol=1e-12)
)
if not changed:
return False
baseline_dict[sym] = new_baseline
baseline_date_dict[sym] = new_baseline_date
alert_levels[sym] = build_alert_levels_for_symbol(sym, new_baseline)
triggered_alerts[sym].clear()
if old_baseline is None:
logging.info(
f"Fetched baseline for {sym}: {new_baseline:.2f} "
f"(previous trading close: {new_baseline_date}; "
f"session date: {reference_date.isoformat()})"
)
else:
if LOG_BASELINE_ROLLOVERS:
logging.info(
f"🔄 {sym} baseline rolled from {old_baseline:.2f} ({old_baseline_date}) "
f"to {new_baseline:.2f} ({new_baseline_date}) "
f"for session date {reference_date.isoformat()}; "
f"thresholds rebuilt and alerts re-armed."
)
log_thresholds_for_symbol(sym, reference_date)
return True
def severity_for_absolute_vix(value: float) -> Tuple[str, str]:
"""
Return emoji + label for absolute VIX regime severity.
Custom but sane VIX fear bands.
"""
if value < 12:
return "😴", "complacent"
if value < 20:
return "🙂", "normal-ish"
if value < 25:
return "😐", "elevated"
if value < 30:
return "😰", "stressed"
if value < 32:
return "🚨😬", "high fear"
if value < 35:
return "🚨😫", "acute fear"
if value < 40:
return "🚨😱", "panic"
if value < 50:
return "🚨💀⚠️", "crisis"
return "🚨☠️💥", "meltdown"
def get_vix_direction_marker(difference: float) -> str:
"""
Return a direction marker for VIX.
For VIX:
- up = worse = use red emoji triangle
- down = better = use neutral plain triangle
- flat = neutral plain pointer
"""
if difference > 0:
return VIX_UP_MARKER
if difference < 0:
return VIX_DOWN_MARKER
return VIX_FLAT_MARKER
def format_date_for_tts(date_str: str) -> str:
"""
Convert YYYY-MM-DD to a TTS-friendly spoken date like:
'March 16, 2020'
"""
dt = datetime.datetime.strptime(date_str, "%Y-%m-%d")
return dt.strftime("%B %d, %Y").replace(" 0", " ")
def check_and_announce_vix_record_break(sym: str, current_value: float) -> None:
"""
Announce new VIX record breaks once per run.
This compares the live/current value against the configured historical
record levels in the config block.
"""
if not ANNOUNCE_RECORD_BREAKS:
return
if sym != "^VIX":
return
state = record_break_state[sym]
# New all-time high
if current_value > VIX_RECORD_HIGH and not state["high_announced"]:
prev_date_spoken = format_date_for_tts(VIX_RECORD_HIGH_DATE)
logging.critical(
f"💥 ^VIX NEW RECORD HIGH: {current_value:.2f} exceeded previous record "
f"{VIX_RECORD_HIGH:.2f} from {VIX_RECORD_HIGH_DATE}!"
)
msg = (
f"Warning! The spot VIX index has surpassed its previous record high of "
f"{VIX_RECORD_HIGH:.2f}, set on {prev_date_spoken}. "
f"The current reading is {current_value:.2f}. "
f"This is a new all time high."
)
logging.info(f"[TTS Message] {msg}")
speak(msg)
state["high_announced"] = True
# New all-time low
if current_value < VIX_RECORD_LOW and not state["low_announced"]:
prev_date_spoken = format_date_for_tts(VIX_RECORD_LOW_DATE)
logging.warning(
f"🕊️ ^VIX NEW RECORD LOW: {current_value:.2f} went below previous record "
f"{VIX_RECORD_LOW:.2f} from {VIX_RECORD_LOW_DATE}!"
)
msg = (
f"Smooth sailing! The spot VIX index has gone below its previous record low of "
f"{VIX_RECORD_LOW:.2f}, set on {prev_date_spoken}. "
f"The current reading is {current_value:.2f}. "
f"This is a new all time low."
)
logging.info(f"[TTS Message] {msg}")
speak(msg)
state["low_announced"] = True
def init_intraday_state_for_symbol() -> Dict[str, Optional[object]]:
"""
Initialize/reset stored session intraday extremes for one symbol.
This is just local state for what has already been logged/tracked;
the actual values are derived from yfinance OHLC data.
"""
return {
"high": None,
"high_ts": None,
"low": None,
"low_ts": None,
}
def get_session_intraday_extremes(
data,
sym: str,
) -> Tuple[Optional[float], Optional[str], Optional[float], Optional[str]]:
"""
Return the true intraday/session high and low so far from the downloaded
OHLC data, using the session's High and Low columns.
Returns:
(session_high, session_high_ts, session_low, session_low_ts)
"""
try:
if getattr(data.columns, "nlevels", 1) > 1:
high_key = ("High", sym)
low_key = ("Low", sym)
if high_key not in data.columns or low_key not in data.columns:
return None, None, None, None
high_series = data[high_key].dropna()
low_series = data[low_key].dropna()
else:
if "High" not in data.columns or "Low" not in data.columns:
return None, None, None, None
high_series = data["High"].dropna()
low_series = data["Low"].dropna()
if high_series.empty or low_series.empty:
return None, None, None, None
session_high = float(high_series.max())
session_low = float(low_series.min())
high_ts = format_dt_index_value(high_series.idxmax())
low_ts = format_dt_index_value(low_series.idxmin())
return session_high, high_ts, session_low, low_ts
except Exception:
return None, None, None, None
def update_intraday_extremes_from_data(data, sym: str) -> None:
"""
Update/log true intraday high/low using OHLC data from yfinance.
"""
session_high, session_high_ts, session_low, session_low_ts = get_session_intraday_extremes(data, sym)
if session_high is None or session_low is None:
return
state = intraday_extremes[sym]
if state["high"] is None or session_high > state["high"]:
state["high"] = session_high
state["high_ts"] = session_high_ts
if LOG_INTRADAY_EXTREMES:
logging.info(
f"📈 {sym} true intraday high: {session_high:.2f} "
f"(timestamp [UTC]: {session_high_ts})"
)
if state["low"] is None or session_low < state["low"]:
state["low"] = session_low
state["low_ts"] = session_low_ts
if LOG_INTRADAY_EXTREMES:
logging.info(
f"📉 {sym} true intraday low: {session_low:.2f} "
f"(timestamp [UTC]: {session_low_ts})"
)
def build_absolute_vix_level_message(level: float, current_value: float) -> str:
if level >= 50:
return (
f"Critical warning! The spot VIX index has crossed {level:.0f}. "
f"The current reading is {current_value:.2f}. "
f"This is extreme panic territory."
)
elif level >= 40:
return (
f"Warning! The spot VIX index has crossed {level:.0f}. "
f"The current reading is {current_value:.2f}. "
f"This is crisis territory."
)
elif level >= 35:
return (
f"Warning! The spot VIX index has crossed {level:.0f}. "
f"The current reading is {current_value:.2f}. "
f"This is severe market stress."
)
else:
return (
f"Attention! The spot VIX index has crossed {level:.0f}. "
f"The current reading is {current_value:.2f}. "
f"Fear is elevated."
)
def announce_absolute_vix_level(
sym: str,
current_value: float,
level: float,
startup_catchup: bool = False,
) -> None:
prefix = "startup catch-up: " if startup_catchup else ""
logging.warning(
f"🚨 {sym} {prefix}absolute level crossed: "
f"{current_value:.2f} >= {level:.2f}"
)
msg = build_absolute_vix_level_message(level, current_value)
logging.info(f"[TTS Message] {msg}")
speak(msg)
absolute_level_alerts[sym].add(str(level))
def check_and_announce_absolute_vix_levels(
sym: str,
current_value: float,
startup_catchup: bool = False,
) -> None:
"""
Announce absolute VIX level crossings such as 20, 25, 30, 40, 50.
Startup catch-up mode:
- if enabled and this is the first processed sample of the run,
only the highest crossed level is announced
- lower crossed levels are silently marked as already seen
- later polling behaves normally again
"""
if not ANNOUNCE_ABSOLUTE_VIX_LEVELS:
return
if sym != "^VIX":
return
crossed_levels = [level for level in ABSOLUTE_VIX_ALERT_LEVELS if current_value >= level]
if not crossed_levels:
return
if startup_catchup and STARTUP_ONLY_HIGHEST_ABSOLUTE_VIX_LEVEL:
highest_crossed = max(crossed_levels)
highest_key = str(highest_crossed)
if highest_key not in absolute_level_alerts[sym]:
announce_absolute_vix_level(
sym=sym,
current_value=current_value,
level=highest_crossed,
startup_catchup=True,
)
# Mark all already-crossed lower bands as seen so they don't spam
# on the next poll while the value is still above them.
for level in crossed_levels:
absolute_level_alerts[sym].add(str(level))
return
for level in ABSOLUTE_VIX_ALERT_LEVELS:
level_key = str(level)
if current_value >= level and level_key not in absolute_level_alerts[sym]:
announce_absolute_vix_level(
sym=sym,
current_value=current_value,
level=level,
startup_catchup=False,
)
def process_absolute_vix_level_resets(sym: str, current_value: float) -> None:
"""
Re-arm absolute VIX level alerts if the index falls sufficiently below them.
"""
if sym != "^VIX":
return
for level in ABSOLUTE_VIX_ALERT_LEVELS:
level_key = str(level)
if level_key not in absolute_level_alerts[sym]:
continue
reset_level = level * (1.0 - ALERT_RESET_HYSTERESIS_PCT / 100.0)
if current_value < reset_level:
absolute_level_alerts[sym].remove(level_key)
if LOG_ALERT_RESETS:
logging.info(
f"↩️ {sym} reset: absolute level {level:.2f} re-armed "
f"(current {current_value:.2f} < reset level {reset_level:.2f})"
)
def log_vix_snapshot(
sym: str,
current_value: float,
arrow: str,
difference: float,
pct_change: float,
baseline: float,
baseline_pct: float,
sev_emoji: str,
sev_label: str,
current_timestamp: str,
is_bootstrap_sample: bool = False,
) -> None:
"""
Log the market snapshot either as:
- one line, or
- two lines with severity/timestamp split below
Controlled by SPLIT_ANALYSIS_LINES.
"""
if is_bootstrap_sample:
primary_msg = (
f"{sym}: {current_value:.2f} "
f"| first sample this run "
f"| baseline {baseline:.2f} ({baseline_pct:+.2f}%)"
)
else:
primary_msg = (
f"{sym}: {current_value:.2f} {arrow} "
f"({difference:+.2f} pts, {pct_change:+.2f}%) "
f"| baseline {baseline:.2f} ({baseline_pct:+.2f}%)"
)
analysis_msg = (
f" severity {sev_emoji} {sev_label} "
f"| data timestamp [UTC]: {current_timestamp}"
)
if SPLIT_ANALYSIS_LINES:
logging.info(primary_msg)
logging.info(analysis_msg)
else:
logging.info(
f"{primary_msg} | severity {sev_emoji} {sev_label} "
f"| data timestamp [UTC]: {current_timestamp}"
)
def process_alert_resets(
sym: str,
current_value: float,
alert_levels: Dict[str, list],
triggered_alerts: Dict[str, set],
) -> None:
"""
Re-arm previously triggered alerts once the value falls sufficiently back
below the threshold.
Uses ALERT_RESET_HYSTERESIS_PCT to avoid flapping when price hovers right
around a threshold.
"""
for lvl in alert_levels[sym]:
if not lvl["enabled"]:
continue
alert_name = lvl["name"]
threshold = lvl["threshold"]
if alert_name not in triggered_alerts[sym]:
continue
reset_level = threshold * (1.0 - ALERT_RESET_HYSTERESIS_PCT / 100.0)
if current_value < reset_level:
triggered_alerts[sym].remove(alert_name)
if LOG_ALERT_RESETS:
logging.info(
f"↩️ {sym} reset: {alert_name} re-armed "
f"(current {current_value:.2f} < reset level {reset_level:.2f}; "
f"threshold was {threshold:.2f})"
)
def check_and_announce_spike_thresholds(
sym: str,
current_value: float,
startup_catchup: bool = False,
) -> None:
"""
Announce percentage spike thresholds relative to baseline.
Startup catch-up mode:
- if enabled and this is the first processed sample of the run,
only the highest crossed spike threshold is announced
- lower crossed thresholds are silently marked as already seen
- later polling behaves normally again
"""
enabled_levels = [lvl for lvl in alert_levels[sym] if lvl["enabled"]]
if not enabled_levels:
return
crossed_levels = [lvl for lvl in enabled_levels if current_value >= lvl["threshold"]]
if not crossed_levels:
return
if startup_catchup and STARTUP_ONLY_HIGHEST_SPIKE_THRESHOLD:
highest_crossed = max(crossed_levels, key=lambda lvl: lvl["threshold"])
highest_name = highest_crossed["name"]
if highest_name not in triggered_alerts[sym]:
logging.warning(
f"🚨 {sym} startup catch-up: spike detected: {highest_crossed['name']} "
f"(≥ {highest_crossed['threshold']:.2f})!"
)
highest_crossed["action"](
sym,
highest_crossed["name"],
current_value,
highest_crossed["threshold"],
)
for lvl in crossed_levels:
triggered_alerts[sym].add(lvl["name"])
return
for lvl in enabled_levels:
if current_value >= lvl["threshold"] and lvl["name"] not in triggered_alerts[sym]:
logging.warning(
f"🚨 {sym} spike detected: {lvl['name']} "
f"(≥ {lvl['threshold']:.2f})!"
)
lvl["action"](sym, lvl["name"], current_value, lvl["threshold"])
triggered_alerts[sym].add(lvl["name"])
def maybe_log_idle_heartbeat(
sym: str,
current_timestamp: str,
current_value: float,
) -> None:
"""
Throttled sign-of-life log when polling continues but the latest candle
timestamp has not advanced.
Logs at most once per HEARTBEAT_INTERVAL_SECONDS per symbol.
"""
if not HEARTBEAT_WHEN_IDLE:
return
now = time.time()
last_log = idle_heartbeat_log_times.get(sym, 0.0)
last_new_data = last_new_data_epochs.get(sym, 0.0)
duplicate_polls = idle_duplicate_poll_counts.get(sym, 0)
if last_log and (now - last_log) < HEARTBEAT_INTERVAL_SECONDS:
return
idle_for_s = int(now - last_new_data) if last_new_data > 0 else 0
logging.info(
f"{sym}: still polling; no new candle yet "
f"(latest timestamp [UTC]: {current_timestamp}, "
f"last value: {current_value:.2f}, "
f"idle for ~{idle_for_s}s, duplicate polls: {duplicate_polls})"
)
idle_heartbeat_log_times[sym] = now
################################################################################
# ALERT FUNCTIONS
################################################################################
def run_custom_script(symbol: str, threshold_label: str, current_value: float, threshold: float) -> None:
logging.info(
f"[run_custom_script] {symbol}: {threshold_label} threshold reached. "
f"Current: {current_value:.2f}, Threshold: {threshold:.2f}"
)
baseline = baseline_dict[symbol]
actual_spike_pct = safe_pct_from_baseline(
current_value,
baseline,
symbol,
"actual_spike_pct",
)
friendly_symbol = SPOKEN_NAME.get(symbol, symbol)
msg = (
f"Attention! {friendly_symbol} triggered {threshold_label}. "
f"Baseline was {baseline:.2f}, and it is now {current_value:.2f}, "
f"a total spike of {actual_spike_pct:.2f} percent from baseline."
)
if actual_spike_pct >= 90:
msg += " DROP THE BOMB! EXTERMINATE THEM ALL!"
elif actual_spike_pct >= 80:
msg += " WE ARE DOOMED! THIS IS THE END!"
elif actual_spike_pct >= 60:
msg += " YAHOO-HO-HO-HOO! LOOKS LIKE WE'RE GONE AND NEVER COMING BACK!"
elif actual_spike_pct >= 50:
msg += " Oh wow. Looks like we're pretty much screwed."
elif actual_spike_pct >= 40:
msg += " System meltdown is imminent. The index is now in panic territory."
elif actual_spike_pct >= 30:
msg += " This is a red alert scenario. Conditions are extremely volatile."
elif actual_spike_pct >= 20:
msg += " Warning: volatility is approaching meltdown levels."
logging.info(f"[TTS Message] {msg}")
speak(msg)
################################################################################
# SETUP
################################################################################
YF_INTERVAL_STRING = get_yf_interval(POLLING_INTERVAL)
logging.info(f"Using polling interval: {POLLING_INTERVAL}s")
logging.info(f"Using yfinance data interval: {YF_INTERVAL_STRING}")
tickers = list(DEFAULT_TICKERS)
baseline_dict: Dict[str, float] = {}
baseline_date_dict: Dict[str, str] = {}
session_date_dict: Dict[str, datetime.date] = {}
previous_values: Dict[str, Optional[float]] = {}
last_data_timestamps: Dict[str, Optional[str]] = {}
idle_heartbeat_log_times: Dict[str, float] = {}
idle_duplicate_poll_counts: Dict[str, int] = {}
last_new_data_epochs: Dict[str, float] = {}
alert_levels: Dict[str, list] = {sym: [] for sym in tickers}
triggered_alerts: Dict[str, set] = {sym: set() for sym in tickers}
absolute_level_alerts: Dict[str, set] = {sym: set() for sym in tickers}
record_break_state: Dict[str, Dict[str, bool]] = {
sym: {"high_announced": False, "low_announced": False}
for sym in tickers
}
intraday_extremes: Dict[str, Dict[str, Optional[object]]] = {
sym: init_intraday_state_for_symbol()
for sym in tickers
}
logging.info("Starting VIX watchers...")
startup_session_date = get_reference_session_date()
startup_catchup_pending: Dict[str, bool] = {sym: True for sym in tickers}
for sym in tickers:
session_date_dict[sym] = startup_session_date
previous_values[sym] = None
last_data_timestamps[sym] = None
idle_heartbeat_log_times[sym] = 0.0
idle_duplicate_poll_counts[sym] = 0
last_new_data_epochs[sym] = 0.0
try:
while True:
all_ready = True
for sym in tickers:
if sym in baseline_dict:
continue
ok = refresh_baseline_for_symbol(
sym,
reference_date=startup_session_date,
force=True,
)
if not ok:
all_ready = False
if all_ready:
reset_rate_limit_backoff()
break
wait_s = increase_rate_limit_backoff()
logging.warning(
f"Startup baseline initialization incomplete or rate-limited. Retrying in {wait_s}s..."
)
time.sleep(wait_s)
except KeyboardInterrupt:
logging.info("Interrupted by user during startup. Exiting.")
raise SystemExit(0)
if TTS_SYSTEM_TEST and TTS_ENABLED:
speak("Volatility index monitoring started.")
################################################################################
# MAIN LOOP
################################################################################
while True:
try:
try:
data = yf.download(
tickers,
period="1d",
interval=YF_INTERVAL_STRING,
progress=False,
auto_adjust=False,
threads=False,
)
reset_rate_limit_backoff()
except YFRateLimitError:
wait_s = increase_rate_limit_backoff()
logging.warning(
f"Yahoo rate-limited intraday download. Backing off for {wait_s}s."
)
time.sleep(wait_s)
continue
if data.empty:
logging.warning(
f"No data returned for tickers using interval {YF_INTERVAL_STRING}. Retrying..."
)
time.sleep(POLLING_INTERVAL)
continue
for sym in tickers:
current_value, current_timestamp = get_latest_close_and_timestamp(data, sym)
if current_value is None or current_timestamp is None or math.isnan(current_value):
fallback = previous_values.get(sym)
if fallback is not None and not math.isnan(fallback):
logging.warning(
f"{sym}: No new usable data in last fetch "
f"(interval: {YF_INTERVAL_STRING}). Using fallback: {fallback:.2f}"
)
current_value = fallback
current_timestamp = last_data_timestamps.get(sym) or "unknown"
else:
logging.warning(f"{sym}: No data and no prior fallback. Skipping.")
continue
if (
SUPPRESS_DUPLICATE_TIMESTAMPS
and current_timestamp == last_data_timestamps.get(sym)
):
idle_duplicate_poll_counts[sym] += 1
maybe_log_idle_heartbeat(
sym=sym,
current_timestamp=current_timestamp,
current_value=current_value,
)
continue
current_session_date = get_reference_session_date(current_timestamp)
if current_session_date != session_date_dict.get(sym):
refreshed = refresh_baseline_for_symbol(
sym,
reference_date=current_session_date,
force=False,
)
if refreshed:
session_date_dict[sym] = current_session_date
intraday_extremes[sym] = init_intraday_state_for_symbol()
absolute_level_alerts[sym].clear()
previous_values[sym] = None
last_data_timestamps[sym] = None
idle_heartbeat_log_times[sym] = 0.0
idle_duplicate_poll_counts[sym] = 0
last_new_data_epochs[sym] = 0.0
startup_catchup_pending[sym] = True
else:
logging.warning(
f"{sym}: Failed to refresh baseline for new session date "
f"{current_session_date.isoformat()}. Skipping this poll and will retry next poll."
)
continue
prev_value = previous_values.get(sym)
if prev_value is None or math.isnan(prev_value):
difference = 0.0
pct_change = 0.0
arrow = VIX_FLAT_MARKER
is_bootstrap_sample = True
else:
difference = current_value - prev_value
pct_change = (difference / prev_value * 100.0) if prev_value else 0.0
arrow = get_vix_direction_marker(difference)
is_bootstrap_sample = False
if sym not in baseline_dict:
logging.warning(
f"{sym}: baseline not available yet. Skipping analysis until baseline fetch succeeds."
)
continue
baseline = baseline_dict[sym]
baseline_pct = safe_pct_from_baseline(
current_value,
baseline,
sym,
"baseline_pct",
)
sev_emoji, sev_label = severity_for_absolute_vix(current_value)
log_vix_snapshot(
sym=sym,
current_value=current_value,
arrow=arrow,
difference=difference,
pct_change=pct_change,
baseline=baseline,
baseline_pct=baseline_pct,
sev_emoji=sev_emoji,
sev_label=sev_label,
current_timestamp=current_timestamp,
is_bootstrap_sample=is_bootstrap_sample,
)
update_intraday_extremes_from_data(data, sym)
check_and_announce_absolute_vix_levels(
sym,
current_value,
startup_catchup=startup_catchup_pending.get(sym, False),
)
check_and_announce_vix_record_break(sym, current_value)
process_alert_resets(
sym=sym,
current_value=current_value,
alert_levels=alert_levels,
triggered_alerts=triggered_alerts,
)
process_absolute_vix_level_resets(sym, current_value)
check_and_announce_spike_thresholds(
sym,
current_value,
startup_catchup=startup_catchup_pending.get(sym, False),
)
idle_duplicate_poll_counts[sym] = 0
last_new_data_epochs[sym] = time.time()
# idle_heartbeat_log_times[sym] = last_new_data_epochs[sym]
idle_heartbeat_log_times[sym] = 0.0
previous_values[sym] = current_value
last_data_timestamps[sym] = current_timestamp
startup_catchup_pending[sym] = False
time.sleep(POLLING_INTERVAL)
except KeyboardInterrupt:
logging.info("Interrupted by user. Exiting.")
break
except Exception:
logging.exception("Unhandled error in main loop")
time.sleep(POLLING_INTERVAL)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment