Last active
March 27, 2026 21:54
-
-
Save FlyingFathead/e6647c743373874ff69384bdedd2ff08 to your computer and use it in GitHub Desktop.
VIX (CBOE Volatility Index) ticker-tracker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # | |
| # vix_ticker.py | |
| # | |
| # A simple Python program to monitor the Chicago Board Options Exchange's | |
| # CBOE Volatility Index (^VIX) via Yahoo Finance. | |
| # | |
| # The purpose of this small program is to trigger warnings via Yahoo Finance | |
| # in case of market panic or other volatility; useful knowledge for many. | |
| # | |
| # By FlyingFathead (https://github.com/FlyingFathead) | |
| # (Initial version created Apr 11 2025 / Updated Mar 27 2026) | |
| # | |
| # Single-file gist version: | |
| # - config block at top | |
| # - versioned startup banner | |
| # - prompted dependency installation (no blind auto-install) | |
| # - local-timezone logging | |
| # - previous-close baseline thresholds | |
| # - optional TTS alerts | |
| # - duplicate candle suppression for sub-minute polling on 1m data | |
| ################################################################################ | |
| # CONFIG / CONSTANTS | |
| ################################################################################ | |
| APP_NAME = "😰😰😰 VIX Ticker-Tracker 😰😰😰" | |
| APP_VERSION = "0.4.32" | |
| APP_AUTHOR = "FlyingFathead" | |
| APP_URL = "https://github.com/FlyingFathead" | |
| LOG_FILE = "vix_spike_watch.log" | |
| DEFAULT_POLLING_INTERVAL = 30 # seconds | |
| TTS_SYSTEM_TEST = True | |
| ENABLE_TTS = True | |
| SUPPRESS_DUPLICATE_TIMESTAMPS = True | |
| DEFAULT_TICKERS = ["^VIX"] | |
| # External packages required by this script. | |
| # Key = import name, value = pip package name. | |
| REQUIRED_PACKAGES = { | |
| "yfinance": "yfinance", | |
| "pyttsx3": "pyttsx3", | |
| "packaging": "packaging", | |
| } | |
| # Spike thresholds, based on previous trading day's close. | |
| SPIKE_THRESHOLDS_PCT = [5, 10, 15, 20, 25, 30, 40, 50] | |
| # baseline rollover behavior | |
| BASELINE_HISTORY_PERIOD = "14d" | |
| LOG_BASELINE_ROLLOVERS = True | |
| SPOKEN_NAME = { | |
| "^VXIND1": "the VIX near-term futures", | |
| "^VIX": "the spot VIX index", | |
| "^DJI": "the Dow Jones index", | |
| } | |
| # VIX direction markers: | |
| # up = worse, down = better, flat = neutral | |
| VIX_UP_MARKER = "🔺" | |
| VIX_DOWN_MARKER = "▽" | |
| VIX_FLAT_MARKER = "▷" | |
| # split analysis into two lines, true/false | |
| SPLIT_ANALYSIS_LINES = True | |
| # alert reset / re-arm behavior | |
| ALERT_RESET_HYSTERESIS_PCT = 0.5 # re-arm once value drops 0.5% below threshold | |
| LOG_ALERT_RESETS = True | |
| # historical VIX record levels used for record-break alerts | |
| # these are close-based records; change them if you prefer intraday records | |
| ANNOUNCE_RECORD_BREAKS = True | |
| VIX_RECORD_HIGH = 82.69 | |
| VIX_RECORD_HIGH_DATE = "2020-03-16" | |
| VIX_RECORD_LOW = 9.14 | |
| VIX_RECORD_LOW_DATE = "2017-11-03" | |
| # absolute VIX level alerts (not relative to previous close) | |
| ANNOUNCE_ABSOLUTE_VIX_LEVELS = True | |
| ABSOLUTE_VIX_ALERT_LEVELS = [20.0, 25.0, 30.0, 35.0, 40.0, 50.0] | |
| # startup-only catch-up behavior for absolute VIX levels: | |
| # if the script starts while VIX is already above multiple levels, only announce | |
| # the highest crossed level once, and silently mark the lower crossed levels as | |
| # already seen for this run. | |
| STARTUP_ONLY_HIGHEST_ABSOLUTE_VIX_LEVEL = True | |
| # startup-only catch-up behavior for percentage spike thresholds: | |
| # if the script starts while the symbol is already above multiple percentage | |
| # spike thresholds, only announce the highest crossed threshold once, and | |
| # silently mark lower crossed thresholds as already seen for this run. | |
| STARTUP_ONLY_HIGHEST_SPIKE_THRESHOLD = True | |
| # intraday/session extreme tracking | |
| LOG_INTRADAY_EXTREMES = True | |
| # for version checking | |
| MIN_TESTED_YFINANCE = "1.2.0" | |
| CHECK_UPDATES_ON_STARTUP = False | |
| PYPI_CHECK_TIMEOUT = 3.0 | |
| # for yfinance rate limits | |
| RATE_LIMIT_BACKOFF_INITIAL = 60 # seconds | |
| RATE_LIMIT_BACKOFF_MAX = 900 # 15 min | |
| rate_limit_backoff_seconds = RATE_LIMIT_BACKOFF_INITIAL | |
| # quiet heartbeat when polling continues but no new candle arrives | |
| HEARTBEAT_WHEN_IDLE = True | |
| HEARTBEAT_INTERVAL_SECONDS = 300 # 5 min | |
| ################################################################################ | |
| # STDLIB IMPORTS | |
| ################################################################################ | |
| import argparse | |
| import atexit | |
| import datetime | |
| import importlib | |
| import importlib.util | |
| import logging | |
| import math | |
| import queue | |
| import shlex | |
| import shutil | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| import warnings | |
| from typing import Dict, List, Optional, Tuple | |
| ################################################################################ | |
| # BANNER / DISPLAY | |
| ################################################################################ | |
| def print_banner() -> None: | |
| width = shutil.get_terminal_size(fallback=(79, 24)).columns | |
| line = "-" * width | |
| title = f"::: {APP_NAME} v{APP_VERSION}" | |
| byline = f"::: by {APP_AUTHOR} | {APP_URL}" | |
| print(line) | |
| print(title) | |
| print(byline) | |
| print(line) | |
| ################################################################################ | |
| # OPTIONAL VERSION / UPDATE CHECKS | |
| ################################################################################ | |
| import json | |
| import urllib.request | |
| from importlib.metadata import PackageNotFoundError, version as dist_version | |
| try: | |
| from packaging.version import Version, InvalidVersion | |
| except Exception: | |
| Version = None | |
| InvalidVersion = Exception | |
| def get_installed_version(dist_name: str) -> Optional[str]: | |
| try: | |
| return dist_version(dist_name) | |
| except PackageNotFoundError: | |
| return None | |
| except Exception: | |
| return None | |
| def version_lt(installed: str, minimum: str) -> bool: | |
| """ | |
| Proper version comparison when packaging is available. | |
| Conservative fallback otherwise. | |
| """ | |
| if Version is not None: | |
| try: | |
| return Version(installed) < Version(minimum) | |
| except InvalidVersion: | |
| pass | |
| # fallback: if we can't compare properly, only warn on non-match | |
| return installed != minimum | |
| def log_runtime_versions() -> None: | |
| py_ver = sys.version.split()[0] | |
| yf_ver = get_installed_version("yfinance") or "unknown" | |
| tts_ver = get_installed_version("pyttsx3") or "unknown" | |
| logging.info( | |
| f"Runtime versions: Python {py_ver} | yfinance {yf_ver} | pyttsx3 {tts_ver}" | |
| ) | |
| if yf_ver != "unknown" and version_lt(yf_ver, MIN_TESTED_YFINANCE): | |
| logging.warning( | |
| f"yfinance {yf_ver} is older than the minimum tested version " | |
| f"{MIN_TESTED_YFINANCE}. Update recommended:\n" | |
| f" {sys.executable} -m pip install -U yfinance" | |
| ) | |
| def get_latest_pypi_version(project: str, timeout: float = PYPI_CHECK_TIMEOUT) -> Optional[str]: | |
| url = f"https://pypi.org/pypi/{project}/json" | |
| req = urllib.request.Request( | |
| url, | |
| headers={"User-Agent": f"{APP_NAME}/{APP_VERSION}"} | |
| ) | |
| try: | |
| with urllib.request.urlopen(req, timeout=timeout) as resp: | |
| payload = json.load(resp) | |
| return payload.get("info", {}).get("version") | |
| except Exception as exc: | |
| logging.warning(f"Could not check PyPI for {project}: {exc}") | |
| return None | |
| def check_for_updates() -> None: | |
| installed = get_installed_version("yfinance") | |
| if not installed: | |
| return | |
| latest = get_latest_pypi_version("yfinance") | |
| if not latest: | |
| return | |
| if Version is not None: | |
| try: | |
| if Version(installed) < Version(latest): | |
| logging.warning( | |
| f"Update available for yfinance: installed {installed}, latest {latest}. " | |
| f"Upgrade with:\n {sys.executable} -m pip install -U yfinance" | |
| ) | |
| else: | |
| logging.info(f"yfinance is up to date: {installed}") | |
| return | |
| except InvalidVersion: | |
| pass | |
| if installed != latest: | |
| logging.warning( | |
| f"Possible yfinance update available: installed {installed}, latest {latest}" | |
| ) | |
| else: | |
| logging.info(f"yfinance is up to date: {installed}") | |
| ################################################################################ | |
| # LOGGING WITH LOCAL TIMEZONE | |
| ################################################################################ | |
| def get_tz_abbreviation(dt: datetime.datetime) -> str: | |
| tz_name = dt.astimezone().tzname() | |
| mapping = { | |
| "FLE Daylight Time": "EEST", | |
| "FLE Standard Time": "EET", | |
| } | |
| return mapping.get(tz_name, tz_name or "LOCAL") | |
| class TZFormatter(logging.Formatter): | |
| def formatTime(self, record, datefmt=None): | |
| ct = datetime.datetime.fromtimestamp(record.created).astimezone() | |
| if datefmt: | |
| s = ct.strftime(datefmt) | |
| else: | |
| s = ct.strftime("%Y-%m-%d %H:%M:%S") | |
| tz_abbr = get_tz_abbreviation(ct) | |
| return f"{s} {tz_abbr}" | |
| def setup_logging() -> None: | |
| formatter = TZFormatter("%(asctime)s - %(levelname)s - %(message)s") | |
| stream_handler = logging.StreamHandler() | |
| stream_handler.setFormatter(formatter) | |
| file_handler = logging.FileHandler(LOG_FILE, encoding="utf-8") | |
| file_handler.setFormatter(formatter) | |
| root = logging.getLogger() | |
| root.setLevel(logging.INFO) | |
| root.handlers.clear() | |
| root.addHandler(stream_handler) | |
| root.addHandler(file_handler) | |
| ################################################################################ | |
| # DEPENDENCY CHECK / PROMPTED INSTALL | |
| ################################################################################ | |
| def is_package_installed(import_name: str) -> bool: | |
| return importlib.util.find_spec(import_name) is not None | |
| def find_missing_packages(required_packages: Dict[str, str]) -> List[Tuple[str, str]]: | |
| missing = [] | |
| for import_name, pip_name in required_packages.items(): | |
| if not is_package_installed(import_name): | |
| missing.append((import_name, pip_name)) | |
| return missing | |
| def prompt_install_missing_packages(missing_packages: List[Tuple[str, str]]) -> None: | |
| if not missing_packages: | |
| return | |
| print() | |
| print("Missing required packages detected:") | |
| for import_name, pip_name in missing_packages: | |
| if import_name == pip_name: | |
| print(f" - {pip_name}") | |
| else: | |
| print(f" - {pip_name} (import: {import_name})") | |
| print() | |
| while True: | |
| response = input( | |
| "Install these now? [y/N] " | |
| "(enter 'n' to abort the whole program): " | |
| ).strip().lower() | |
| if response in ("", "n", "no"): | |
| print("Aborting at user request.") | |
| raise SystemExit(1) | |
| if response in ("y", "yes"): | |
| break | |
| print("Please answer y or n.") | |
| default_prefix = f'"{sys.executable}" -m pip install' | |
| install_prefix = input( | |
| f"Preferred install command prefix\n" | |
| f"[default: {default_prefix}]\n> " | |
| ).strip() | |
| if not install_prefix: | |
| install_prefix = default_prefix | |
| cmd = shlex.split(install_prefix) + [pip_name for _, pip_name in missing_packages] | |
| print() | |
| print("Running install command:") | |
| print(" " + " ".join(cmd)) | |
| print() | |
| try: | |
| subprocess.check_call(cmd) | |
| importlib.invalidate_caches() | |
| except subprocess.CalledProcessError as exc: | |
| print(f"Dependency install failed: {exc}") | |
| raise SystemExit(1) | |
| still_missing = find_missing_packages(REQUIRED_PACKAGES) | |
| if still_missing: | |
| print("Some packages are still missing after installation:") | |
| for import_name, pip_name in still_missing: | |
| print(f" - {pip_name} (import: {import_name})") | |
| raise SystemExit(1) | |
| ################################################################################ | |
| # DEPENDENCY BOOTSTRAP | |
| ################################################################################ | |
| print_banner() | |
| setup_logging() | |
| missing_packages = find_missing_packages(REQUIRED_PACKAGES) | |
| prompt_install_missing_packages(missing_packages) | |
| import yfinance as yf # noqa: E402 | |
| import pyttsx3 # noqa: E402 | |
| from yfinance.exceptions import YFRateLimitError # noqa: E402 | |
| warnings.simplefilter(action="ignore", category=FutureWarning) | |
| ################################################################################ | |
| # CMDLINE ARGS | |
| ################################################################################ | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Monitor VIX spikes from previous close.") | |
| parser.add_argument( | |
| "--interval", | |
| type=int, | |
| default=DEFAULT_POLLING_INTERVAL, | |
| help="Polling interval in seconds", | |
| ) | |
| parser.add_argument( | |
| "--no-tts", | |
| action="store_true", | |
| help="Disable text-to-speech even if ENABLE_TTS is True", | |
| ) | |
| parser.add_argument( | |
| "--check-updates", | |
| action="store_true", | |
| help="Check PyPI for newer package versions on startup", | |
| ) | |
| return parser.parse_args() | |
| args = parse_args() | |
| POLLING_INTERVAL = args.interval | |
| TTS_ENABLED = ENABLE_TTS and not args.no_tts | |
| log_runtime_versions() | |
| if CHECK_UPDATES_ON_STARTUP or args.check_updates: | |
| check_for_updates() | |
| ################################################################################ | |
| # TTS | |
| ################################################################################ | |
| TTS_QUEUE_MAXSIZE = 32 | |
| TTS_DROP_WHEN_BUSY = True | |
| TTS_DEDUP_CONSECUTIVE = True | |
| _tts_queue = queue.Queue(maxsize=TTS_QUEUE_MAXSIZE) | |
| _tts_thread = None | |
| _tts_last_text = None | |
| _tts_worker_failed = False | |
| def _tts_worker() -> None: | |
| global _tts_worker_failed | |
| try: | |
| engine = pyttsx3.init() | |
| except Exception as exc: | |
| _tts_worker_failed = True | |
| logging.error(f"TTS init error: {exc}") | |
| return | |
| while True: | |
| text = _tts_queue.get() | |
| try: | |
| if text is None: | |
| return | |
| engine.say(text) | |
| engine.runAndWait() | |
| except Exception as exc: | |
| logging.error(f"TTS worker error: {exc}") | |
| finally: | |
| _tts_queue.task_done() | |
| def ensure_tts_worker() -> None: | |
| global _tts_thread | |
| if _tts_worker_failed: | |
| return | |
| if _tts_thread is None or not _tts_thread.is_alive(): | |
| _tts_thread = threading.Thread( | |
| target=_tts_worker, | |
| name="tts-worker", | |
| daemon=True, | |
| ) | |
| _tts_thread.start() | |
| def stop_tts_worker() -> None: | |
| if _tts_thread is not None and _tts_thread.is_alive(): | |
| try: | |
| _tts_queue.put_nowait(None) | |
| except queue.Full: | |
| pass | |
| atexit.register(stop_tts_worker) | |
| def speak(text: str) -> None: | |
| global _tts_last_text | |
| if not TTS_ENABLED: | |
| return | |
| ensure_tts_worker() | |
| if _tts_worker_failed: | |
| return | |
| if TTS_DEDUP_CONSECUTIVE and text == _tts_last_text: | |
| return | |
| try: | |
| if TTS_DROP_WHEN_BUSY: | |
| _tts_queue.put_nowait(text) | |
| else: | |
| _tts_queue.put(text) | |
| _tts_last_text = text | |
| except queue.Full: | |
| logging.warning("TTS queue full; dropping speech message.") | |
| ################################################################################ | |
| # HELPERS | |
| ################################################################################ | |
| def safe_pct_from_baseline(current_value: float, baseline: float, sym: str, label: str) -> float: | |
| if baseline > 0: | |
| return ((current_value / baseline) - 1.0) * 100.0 | |
| logging.warning( | |
| f"{sym}: baseline is non-positive ({baseline}); forcing {label} to 0.0" | |
| ) | |
| return 0.0 | |
| def reset_rate_limit_backoff() -> None: | |
| global rate_limit_backoff_seconds | |
| rate_limit_backoff_seconds = RATE_LIMIT_BACKOFF_INITIAL | |
| def increase_rate_limit_backoff() -> int: | |
| global rate_limit_backoff_seconds | |
| wait_s = rate_limit_backoff_seconds | |
| rate_limit_backoff_seconds = min(rate_limit_backoff_seconds * 2, RATE_LIMIT_BACKOFF_MAX) | |
| return wait_s | |
| def get_yf_interval(seconds: int) -> str: | |
| """ | |
| Map polling interval in seconds to a yfinance interval string. | |
| If the polling interval is below 60s, yfinance still only gives 1m data, | |
| so we clamp to '1m' and log a warning. | |
| """ | |
| if seconds < 60: | |
| logging.warning( | |
| f"Polling interval {seconds}s is less than 1 minute. Using '1m' for yfinance data." | |
| ) | |
| return "1m" | |
| if seconds == 60: | |
| return "1m" | |
| if seconds == 120: | |
| return "2m" | |
| if seconds == 300: | |
| return "5m" | |
| if seconds == 900: | |
| return "15m" | |
| if seconds == 1800: | |
| return "30m" | |
| if seconds == 3600: | |
| return "60m" | |
| if seconds == 5400: | |
| return "90m" | |
| logging.warning( | |
| f"Polling interval {seconds}s does not map directly to a standard " | |
| f"yfinance intraday interval. Using '1m'." | |
| ) | |
| return "1m" | |
| def format_dt_index_value(ts) -> str: | |
| """ | |
| Convert a pandas/DatetimeIndex timestamp to a clean UTC string if possible. | |
| Falls back to str(ts) if needed. | |
| """ | |
| if hasattr(ts, "to_pydatetime"): | |
| dt = ts.to_pydatetime() | |
| if dt.tzinfo is None: | |
| return dt.strftime("%Y-%m-%d %H:%M:%S") | |
| return dt.astimezone(datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S") | |
| return str(ts) | |
| def get_latest_close_and_timestamp(data, sym: str) -> Tuple[Optional[float], Optional[str]]: | |
| """ | |
| Return the latest close value and timestamp for a symbol from a yfinance | |
| download dataframe. | |
| Supports both: | |
| - MultiIndex columns: ('Close', '^VIX') | |
| - Single-level columns: 'Close' | |
| """ | |
| try: | |
| if getattr(data.columns, "nlevels", 1) > 1: | |
| if ("Close", sym) not in data.columns: | |
| return None, None | |
| series = data[("Close", sym)].dropna() | |
| else: | |
| if "Close" not in data.columns: | |
| return None, None | |
| series = data["Close"].dropna() | |
| if series.empty: | |
| return None, None | |
| value = float(series.iloc[-1]) | |
| timestamp = format_dt_index_value(series.index[-1]) | |
| return value, timestamp | |
| except Exception: | |
| return None, None | |
| def get_reference_session_date(current_timestamp: Optional[str] = None) -> datetime.date: | |
| """ | |
| Return the current session/reference date as a UTC date. | |
| For U.S. market intraday timestamps in this script, using the UTC date is | |
| sufficient because the trading session does not cross UTC midnight. | |
| """ | |
| if current_timestamp: | |
| try: | |
| dt = datetime.datetime.strptime(current_timestamp, "%Y-%m-%d %H:%M:%S") | |
| return dt.date() | |
| except Exception: | |
| pass | |
| return datetime.datetime.now(datetime.timezone.utc).date() | |
| def get_previous_trading_close_info( | |
| ticker: str, | |
| reference_date: datetime.date, | |
| ) -> Tuple[Optional[float], Optional[str]]: | |
| """ | |
| Return the last completed daily close strictly BEFORE the given reference_date. | |
| """ | |
| try: | |
| t = yf.Ticker(ticker) | |
| info = t.history(period=BASELINE_HISTORY_PERIOD, interval="1d", auto_adjust=False) | |
| except YFRateLimitError: | |
| logging.warning( | |
| f"{ticker}: Yahoo rate-limited baseline fetch. Will retry later." | |
| ) | |
| return None, None | |
| except Exception as exc: | |
| logging.error( | |
| f"{ticker}: error while fetching previous trading close: {exc}" | |
| ) | |
| return None, None | |
| if info is None or info.empty or "Close" not in info.columns: | |
| logging.warning( | |
| f"No usable daily history returned for {ticker}. Cannot determine previous close yet." | |
| ) | |
| return None, None | |
| closes = info["Close"].dropna() | |
| if closes.empty: | |
| logging.warning( | |
| f"Daily history for {ticker} contains no usable close values. Cannot determine previous close yet." | |
| ) | |
| return None, None | |
| try: | |
| eligible = closes[closes.index.date < reference_date] | |
| if eligible.empty: | |
| logging.warning( | |
| f"No completed daily close earlier than {reference_date.isoformat()} " | |
| f"for {ticker}. Cannot determine previous close yet." | |
| ) | |
| return None, None | |
| close_value = float(eligible.iloc[-1]) | |
| close_date = eligible.index[-1].strftime("%Y-%m-%d") | |
| if math.isnan(close_value): | |
| logging.warning( | |
| f"Previous trading close for {ticker} ({close_date}) is NaN. " | |
| f"Cannot determine baseline yet." | |
| ) | |
| return None, None | |
| return close_value, close_date | |
| except Exception as exc: | |
| logging.error( | |
| f"Unexpected error getting previous trading close for {ticker}: {exc}" | |
| ) | |
| return None, None | |
| def build_alert_levels_for_symbol(sym: str, baseline: float) -> list: | |
| """ | |
| Build threshold definitions for a single symbol from a baseline. | |
| """ | |
| levels = [] | |
| for pct in SPIKE_THRESHOLDS_PCT: | |
| levels.append( | |
| { | |
| "name": f"{pct}% Spike", | |
| "pct": pct, | |
| "threshold": baseline * (1.0 + pct / 100.0), | |
| "action": run_custom_script, | |
| "enabled": True, | |
| } | |
| ) | |
| return levels | |
| def log_thresholds_for_symbol(sym: str, reference_date: datetime.date) -> None: | |
| """ | |
| Log current baseline and thresholds for one symbol, anchored to the session date. | |
| """ | |
| logging.info( | |
| f"Baseline for {sym}: {baseline_dict[sym]:.2f} " | |
| f"(previous close date: {baseline_date_dict[sym]}; " | |
| f"session date: {reference_date.isoformat()})" | |
| ) | |
| for lvl in alert_levels[sym]: | |
| if lvl["enabled"]: | |
| logging.info(f" {sym} -> {lvl['name']}: {lvl['threshold']:.2f}") | |
| def refresh_baseline_for_symbol( | |
| sym: str, | |
| reference_date: datetime.date, | |
| force: bool = False, | |
| ) -> bool: | |
| """ | |
| Refresh the symbol baseline from the previous trading close relative to | |
| the given reference_date. | |
| Returns True if the baseline changed, False otherwise. | |
| On change: | |
| - baseline_dict[sym] is updated | |
| - baseline_date_dict[sym] is updated | |
| - alert thresholds are rebuilt | |
| - triggered alerts are cleared/re-armed | |
| """ | |
| new_baseline, new_baseline_date = get_previous_trading_close_info(sym, reference_date) | |
| if new_baseline is None or new_baseline_date is None: | |
| return False | |
| old_baseline = baseline_dict.get(sym) | |
| old_baseline_date = baseline_date_dict.get(sym) | |
| changed = ( | |
| force | |
| or old_baseline is None | |
| or old_baseline_date != new_baseline_date | |
| or not math.isclose(old_baseline, new_baseline, rel_tol=0.0, abs_tol=1e-12) | |
| ) | |
| if not changed: | |
| return False | |
| baseline_dict[sym] = new_baseline | |
| baseline_date_dict[sym] = new_baseline_date | |
| alert_levels[sym] = build_alert_levels_for_symbol(sym, new_baseline) | |
| triggered_alerts[sym].clear() | |
| if old_baseline is None: | |
| logging.info( | |
| f"Fetched baseline for {sym}: {new_baseline:.2f} " | |
| f"(previous trading close: {new_baseline_date}; " | |
| f"session date: {reference_date.isoformat()})" | |
| ) | |
| else: | |
| if LOG_BASELINE_ROLLOVERS: | |
| logging.info( | |
| f"🔄 {sym} baseline rolled from {old_baseline:.2f} ({old_baseline_date}) " | |
| f"to {new_baseline:.2f} ({new_baseline_date}) " | |
| f"for session date {reference_date.isoformat()}; " | |
| f"thresholds rebuilt and alerts re-armed." | |
| ) | |
| log_thresholds_for_symbol(sym, reference_date) | |
| return True | |
| def severity_for_absolute_vix(value: float) -> Tuple[str, str]: | |
| """ | |
| Return emoji + label for absolute VIX regime severity. | |
| Custom but sane VIX fear bands. | |
| """ | |
| if value < 12: | |
| return "😴", "complacent" | |
| if value < 20: | |
| return "🙂", "normal-ish" | |
| if value < 25: | |
| return "😐", "elevated" | |
| if value < 30: | |
| return "😰", "stressed" | |
| if value < 32: | |
| return "🚨😬", "high fear" | |
| if value < 35: | |
| return "🚨😫", "acute fear" | |
| if value < 40: | |
| return "🚨😱", "panic" | |
| if value < 50: | |
| return "🚨💀⚠️", "crisis" | |
| return "🚨☠️💥", "meltdown" | |
| def get_vix_direction_marker(difference: float) -> str: | |
| """ | |
| Return a direction marker for VIX. | |
| For VIX: | |
| - up = worse = use red emoji triangle | |
| - down = better = use neutral plain triangle | |
| - flat = neutral plain pointer | |
| """ | |
| if difference > 0: | |
| return VIX_UP_MARKER | |
| if difference < 0: | |
| return VIX_DOWN_MARKER | |
| return VIX_FLAT_MARKER | |
| def format_date_for_tts(date_str: str) -> str: | |
| """ | |
| Convert YYYY-MM-DD to a TTS-friendly spoken date like: | |
| 'March 16, 2020' | |
| """ | |
| dt = datetime.datetime.strptime(date_str, "%Y-%m-%d") | |
| return dt.strftime("%B %d, %Y").replace(" 0", " ") | |
| def check_and_announce_vix_record_break(sym: str, current_value: float) -> None: | |
| """ | |
| Announce new VIX record breaks once per run. | |
| This compares the live/current value against the configured historical | |
| record levels in the config block. | |
| """ | |
| if not ANNOUNCE_RECORD_BREAKS: | |
| return | |
| if sym != "^VIX": | |
| return | |
| state = record_break_state[sym] | |
| # New all-time high | |
| if current_value > VIX_RECORD_HIGH and not state["high_announced"]: | |
| prev_date_spoken = format_date_for_tts(VIX_RECORD_HIGH_DATE) | |
| logging.critical( | |
| f"💥 ^VIX NEW RECORD HIGH: {current_value:.2f} exceeded previous record " | |
| f"{VIX_RECORD_HIGH:.2f} from {VIX_RECORD_HIGH_DATE}!" | |
| ) | |
| msg = ( | |
| f"Warning! The spot VIX index has surpassed its previous record high of " | |
| f"{VIX_RECORD_HIGH:.2f}, set on {prev_date_spoken}. " | |
| f"The current reading is {current_value:.2f}. " | |
| f"This is a new all time high." | |
| ) | |
| logging.info(f"[TTS Message] {msg}") | |
| speak(msg) | |
| state["high_announced"] = True | |
| # New all-time low | |
| if current_value < VIX_RECORD_LOW and not state["low_announced"]: | |
| prev_date_spoken = format_date_for_tts(VIX_RECORD_LOW_DATE) | |
| logging.warning( | |
| f"🕊️ ^VIX NEW RECORD LOW: {current_value:.2f} went below previous record " | |
| f"{VIX_RECORD_LOW:.2f} from {VIX_RECORD_LOW_DATE}!" | |
| ) | |
| msg = ( | |
| f"Smooth sailing! The spot VIX index has gone below its previous record low of " | |
| f"{VIX_RECORD_LOW:.2f}, set on {prev_date_spoken}. " | |
| f"The current reading is {current_value:.2f}. " | |
| f"This is a new all time low." | |
| ) | |
| logging.info(f"[TTS Message] {msg}") | |
| speak(msg) | |
| state["low_announced"] = True | |
| def init_intraday_state_for_symbol() -> Dict[str, Optional[object]]: | |
| """ | |
| Initialize/reset stored session intraday extremes for one symbol. | |
| This is just local state for what has already been logged/tracked; | |
| the actual values are derived from yfinance OHLC data. | |
| """ | |
| return { | |
| "high": None, | |
| "high_ts": None, | |
| "low": None, | |
| "low_ts": None, | |
| } | |
| def get_session_intraday_extremes( | |
| data, | |
| sym: str, | |
| ) -> Tuple[Optional[float], Optional[str], Optional[float], Optional[str]]: | |
| """ | |
| Return the true intraday/session high and low so far from the downloaded | |
| OHLC data, using the session's High and Low columns. | |
| Returns: | |
| (session_high, session_high_ts, session_low, session_low_ts) | |
| """ | |
| try: | |
| if getattr(data.columns, "nlevels", 1) > 1: | |
| high_key = ("High", sym) | |
| low_key = ("Low", sym) | |
| if high_key not in data.columns or low_key not in data.columns: | |
| return None, None, None, None | |
| high_series = data[high_key].dropna() | |
| low_series = data[low_key].dropna() | |
| else: | |
| if "High" not in data.columns or "Low" not in data.columns: | |
| return None, None, None, None | |
| high_series = data["High"].dropna() | |
| low_series = data["Low"].dropna() | |
| if high_series.empty or low_series.empty: | |
| return None, None, None, None | |
| session_high = float(high_series.max()) | |
| session_low = float(low_series.min()) | |
| high_ts = format_dt_index_value(high_series.idxmax()) | |
| low_ts = format_dt_index_value(low_series.idxmin()) | |
| return session_high, high_ts, session_low, low_ts | |
| except Exception: | |
| return None, None, None, None | |
| def update_intraday_extremes_from_data(data, sym: str) -> None: | |
| """ | |
| Update/log true intraday high/low using OHLC data from yfinance. | |
| """ | |
| session_high, session_high_ts, session_low, session_low_ts = get_session_intraday_extremes(data, sym) | |
| if session_high is None or session_low is None: | |
| return | |
| state = intraday_extremes[sym] | |
| if state["high"] is None or session_high > state["high"]: | |
| state["high"] = session_high | |
| state["high_ts"] = session_high_ts | |
| if LOG_INTRADAY_EXTREMES: | |
| logging.info( | |
| f"📈 {sym} true intraday high: {session_high:.2f} " | |
| f"(timestamp [UTC]: {session_high_ts})" | |
| ) | |
| if state["low"] is None or session_low < state["low"]: | |
| state["low"] = session_low | |
| state["low_ts"] = session_low_ts | |
| if LOG_INTRADAY_EXTREMES: | |
| logging.info( | |
| f"📉 {sym} true intraday low: {session_low:.2f} " | |
| f"(timestamp [UTC]: {session_low_ts})" | |
| ) | |
| def build_absolute_vix_level_message(level: float, current_value: float) -> str: | |
| if level >= 50: | |
| return ( | |
| f"Critical warning! The spot VIX index has crossed {level:.0f}. " | |
| f"The current reading is {current_value:.2f}. " | |
| f"This is extreme panic territory." | |
| ) | |
| elif level >= 40: | |
| return ( | |
| f"Warning! The spot VIX index has crossed {level:.0f}. " | |
| f"The current reading is {current_value:.2f}. " | |
| f"This is crisis territory." | |
| ) | |
| elif level >= 35: | |
| return ( | |
| f"Warning! The spot VIX index has crossed {level:.0f}. " | |
| f"The current reading is {current_value:.2f}. " | |
| f"This is severe market stress." | |
| ) | |
| else: | |
| return ( | |
| f"Attention! The spot VIX index has crossed {level:.0f}. " | |
| f"The current reading is {current_value:.2f}. " | |
| f"Fear is elevated." | |
| ) | |
| def announce_absolute_vix_level( | |
| sym: str, | |
| current_value: float, | |
| level: float, | |
| startup_catchup: bool = False, | |
| ) -> None: | |
| prefix = "startup catch-up: " if startup_catchup else "" | |
| logging.warning( | |
| f"🚨 {sym} {prefix}absolute level crossed: " | |
| f"{current_value:.2f} >= {level:.2f}" | |
| ) | |
| msg = build_absolute_vix_level_message(level, current_value) | |
| logging.info(f"[TTS Message] {msg}") | |
| speak(msg) | |
| absolute_level_alerts[sym].add(str(level)) | |
| def check_and_announce_absolute_vix_levels( | |
| sym: str, | |
| current_value: float, | |
| startup_catchup: bool = False, | |
| ) -> None: | |
| """ | |
| Announce absolute VIX level crossings such as 20, 25, 30, 40, 50. | |
| Startup catch-up mode: | |
| - if enabled and this is the first processed sample of the run, | |
| only the highest crossed level is announced | |
| - lower crossed levels are silently marked as already seen | |
| - later polling behaves normally again | |
| """ | |
| if not ANNOUNCE_ABSOLUTE_VIX_LEVELS: | |
| return | |
| if sym != "^VIX": | |
| return | |
| crossed_levels = [level for level in ABSOLUTE_VIX_ALERT_LEVELS if current_value >= level] | |
| if not crossed_levels: | |
| return | |
| if startup_catchup and STARTUP_ONLY_HIGHEST_ABSOLUTE_VIX_LEVEL: | |
| highest_crossed = max(crossed_levels) | |
| highest_key = str(highest_crossed) | |
| if highest_key not in absolute_level_alerts[sym]: | |
| announce_absolute_vix_level( | |
| sym=sym, | |
| current_value=current_value, | |
| level=highest_crossed, | |
| startup_catchup=True, | |
| ) | |
| # Mark all already-crossed lower bands as seen so they don't spam | |
| # on the next poll while the value is still above them. | |
| for level in crossed_levels: | |
| absolute_level_alerts[sym].add(str(level)) | |
| return | |
| for level in ABSOLUTE_VIX_ALERT_LEVELS: | |
| level_key = str(level) | |
| if current_value >= level and level_key not in absolute_level_alerts[sym]: | |
| announce_absolute_vix_level( | |
| sym=sym, | |
| current_value=current_value, | |
| level=level, | |
| startup_catchup=False, | |
| ) | |
| def process_absolute_vix_level_resets(sym: str, current_value: float) -> None: | |
| """ | |
| Re-arm absolute VIX level alerts if the index falls sufficiently below them. | |
| """ | |
| if sym != "^VIX": | |
| return | |
| for level in ABSOLUTE_VIX_ALERT_LEVELS: | |
| level_key = str(level) | |
| if level_key not in absolute_level_alerts[sym]: | |
| continue | |
| reset_level = level * (1.0 - ALERT_RESET_HYSTERESIS_PCT / 100.0) | |
| if current_value < reset_level: | |
| absolute_level_alerts[sym].remove(level_key) | |
| if LOG_ALERT_RESETS: | |
| logging.info( | |
| f"↩️ {sym} reset: absolute level {level:.2f} re-armed " | |
| f"(current {current_value:.2f} < reset level {reset_level:.2f})" | |
| ) | |
| def log_vix_snapshot( | |
| sym: str, | |
| current_value: float, | |
| arrow: str, | |
| difference: float, | |
| pct_change: float, | |
| baseline: float, | |
| baseline_pct: float, | |
| sev_emoji: str, | |
| sev_label: str, | |
| current_timestamp: str, | |
| is_bootstrap_sample: bool = False, | |
| ) -> None: | |
| """ | |
| Log the market snapshot either as: | |
| - one line, or | |
| - two lines with severity/timestamp split below | |
| Controlled by SPLIT_ANALYSIS_LINES. | |
| """ | |
| if is_bootstrap_sample: | |
| primary_msg = ( | |
| f"{sym}: {current_value:.2f} " | |
| f"| first sample this run " | |
| f"| baseline {baseline:.2f} ({baseline_pct:+.2f}%)" | |
| ) | |
| else: | |
| primary_msg = ( | |
| f"{sym}: {current_value:.2f} {arrow} " | |
| f"({difference:+.2f} pts, {pct_change:+.2f}%) " | |
| f"| baseline {baseline:.2f} ({baseline_pct:+.2f}%)" | |
| ) | |
| analysis_msg = ( | |
| f" severity {sev_emoji} {sev_label} " | |
| f"| data timestamp [UTC]: {current_timestamp}" | |
| ) | |
| if SPLIT_ANALYSIS_LINES: | |
| logging.info(primary_msg) | |
| logging.info(analysis_msg) | |
| else: | |
| logging.info( | |
| f"{primary_msg} | severity {sev_emoji} {sev_label} " | |
| f"| data timestamp [UTC]: {current_timestamp}" | |
| ) | |
| def process_alert_resets( | |
| sym: str, | |
| current_value: float, | |
| alert_levels: Dict[str, list], | |
| triggered_alerts: Dict[str, set], | |
| ) -> None: | |
| """ | |
| Re-arm previously triggered alerts once the value falls sufficiently back | |
| below the threshold. | |
| Uses ALERT_RESET_HYSTERESIS_PCT to avoid flapping when price hovers right | |
| around a threshold. | |
| """ | |
| for lvl in alert_levels[sym]: | |
| if not lvl["enabled"]: | |
| continue | |
| alert_name = lvl["name"] | |
| threshold = lvl["threshold"] | |
| if alert_name not in triggered_alerts[sym]: | |
| continue | |
| reset_level = threshold * (1.0 - ALERT_RESET_HYSTERESIS_PCT / 100.0) | |
| if current_value < reset_level: | |
| triggered_alerts[sym].remove(alert_name) | |
| if LOG_ALERT_RESETS: | |
| logging.info( | |
| f"↩️ {sym} reset: {alert_name} re-armed " | |
| f"(current {current_value:.2f} < reset level {reset_level:.2f}; " | |
| f"threshold was {threshold:.2f})" | |
| ) | |
| def check_and_announce_spike_thresholds( | |
| sym: str, | |
| current_value: float, | |
| startup_catchup: bool = False, | |
| ) -> None: | |
| """ | |
| Announce percentage spike thresholds relative to baseline. | |
| Startup catch-up mode: | |
| - if enabled and this is the first processed sample of the run, | |
| only the highest crossed spike threshold is announced | |
| - lower crossed thresholds are silently marked as already seen | |
| - later polling behaves normally again | |
| """ | |
| enabled_levels = [lvl for lvl in alert_levels[sym] if lvl["enabled"]] | |
| if not enabled_levels: | |
| return | |
| crossed_levels = [lvl for lvl in enabled_levels if current_value >= lvl["threshold"]] | |
| if not crossed_levels: | |
| return | |
| if startup_catchup and STARTUP_ONLY_HIGHEST_SPIKE_THRESHOLD: | |
| highest_crossed = max(crossed_levels, key=lambda lvl: lvl["threshold"]) | |
| highest_name = highest_crossed["name"] | |
| if highest_name not in triggered_alerts[sym]: | |
| logging.warning( | |
| f"🚨 {sym} startup catch-up: spike detected: {highest_crossed['name']} " | |
| f"(≥ {highest_crossed['threshold']:.2f})!" | |
| ) | |
| highest_crossed["action"]( | |
| sym, | |
| highest_crossed["name"], | |
| current_value, | |
| highest_crossed["threshold"], | |
| ) | |
| for lvl in crossed_levels: | |
| triggered_alerts[sym].add(lvl["name"]) | |
| return | |
| for lvl in enabled_levels: | |
| if current_value >= lvl["threshold"] and lvl["name"] not in triggered_alerts[sym]: | |
| logging.warning( | |
| f"🚨 {sym} spike detected: {lvl['name']} " | |
| f"(≥ {lvl['threshold']:.2f})!" | |
| ) | |
| lvl["action"](sym, lvl["name"], current_value, lvl["threshold"]) | |
| triggered_alerts[sym].add(lvl["name"]) | |
| def maybe_log_idle_heartbeat( | |
| sym: str, | |
| current_timestamp: str, | |
| current_value: float, | |
| ) -> None: | |
| """ | |
| Throttled sign-of-life log when polling continues but the latest candle | |
| timestamp has not advanced. | |
| Logs at most once per HEARTBEAT_INTERVAL_SECONDS per symbol. | |
| """ | |
| if not HEARTBEAT_WHEN_IDLE: | |
| return | |
| now = time.time() | |
| last_log = idle_heartbeat_log_times.get(sym, 0.0) | |
| last_new_data = last_new_data_epochs.get(sym, 0.0) | |
| duplicate_polls = idle_duplicate_poll_counts.get(sym, 0) | |
| if last_log and (now - last_log) < HEARTBEAT_INTERVAL_SECONDS: | |
| return | |
| idle_for_s = int(now - last_new_data) if last_new_data > 0 else 0 | |
| logging.info( | |
| f"{sym}: still polling; no new candle yet " | |
| f"(latest timestamp [UTC]: {current_timestamp}, " | |
| f"last value: {current_value:.2f}, " | |
| f"idle for ~{idle_for_s}s, duplicate polls: {duplicate_polls})" | |
| ) | |
| idle_heartbeat_log_times[sym] = now | |
| ################################################################################ | |
| # ALERT FUNCTIONS | |
| ################################################################################ | |
| def run_custom_script(symbol: str, threshold_label: str, current_value: float, threshold: float) -> None: | |
| logging.info( | |
| f"[run_custom_script] {symbol}: {threshold_label} threshold reached. " | |
| f"Current: {current_value:.2f}, Threshold: {threshold:.2f}" | |
| ) | |
| baseline = baseline_dict[symbol] | |
| actual_spike_pct = safe_pct_from_baseline( | |
| current_value, | |
| baseline, | |
| symbol, | |
| "actual_spike_pct", | |
| ) | |
| friendly_symbol = SPOKEN_NAME.get(symbol, symbol) | |
| msg = ( | |
| f"Attention! {friendly_symbol} triggered {threshold_label}. " | |
| f"Baseline was {baseline:.2f}, and it is now {current_value:.2f}, " | |
| f"a total spike of {actual_spike_pct:.2f} percent from baseline." | |
| ) | |
| if actual_spike_pct >= 90: | |
| msg += " DROP THE BOMB! EXTERMINATE THEM ALL!" | |
| elif actual_spike_pct >= 80: | |
| msg += " WE ARE DOOMED! THIS IS THE END!" | |
| elif actual_spike_pct >= 60: | |
| msg += " YAHOO-HO-HO-HOO! LOOKS LIKE WE'RE GONE AND NEVER COMING BACK!" | |
| elif actual_spike_pct >= 50: | |
| msg += " Oh wow. Looks like we're pretty much screwed." | |
| elif actual_spike_pct >= 40: | |
| msg += " System meltdown is imminent. The index is now in panic territory." | |
| elif actual_spike_pct >= 30: | |
| msg += " This is a red alert scenario. Conditions are extremely volatile." | |
| elif actual_spike_pct >= 20: | |
| msg += " Warning: volatility is approaching meltdown levels." | |
| logging.info(f"[TTS Message] {msg}") | |
| speak(msg) | |
| ################################################################################ | |
| # SETUP | |
| ################################################################################ | |
| YF_INTERVAL_STRING = get_yf_interval(POLLING_INTERVAL) | |
| logging.info(f"Using polling interval: {POLLING_INTERVAL}s") | |
| logging.info(f"Using yfinance data interval: {YF_INTERVAL_STRING}") | |
| tickers = list(DEFAULT_TICKERS) | |
| baseline_dict: Dict[str, float] = {} | |
| baseline_date_dict: Dict[str, str] = {} | |
| session_date_dict: Dict[str, datetime.date] = {} | |
| previous_values: Dict[str, Optional[float]] = {} | |
| last_data_timestamps: Dict[str, Optional[str]] = {} | |
| idle_heartbeat_log_times: Dict[str, float] = {} | |
| idle_duplicate_poll_counts: Dict[str, int] = {} | |
| last_new_data_epochs: Dict[str, float] = {} | |
| alert_levels: Dict[str, list] = {sym: [] for sym in tickers} | |
| triggered_alerts: Dict[str, set] = {sym: set() for sym in tickers} | |
| absolute_level_alerts: Dict[str, set] = {sym: set() for sym in tickers} | |
| record_break_state: Dict[str, Dict[str, bool]] = { | |
| sym: {"high_announced": False, "low_announced": False} | |
| for sym in tickers | |
| } | |
| intraday_extremes: Dict[str, Dict[str, Optional[object]]] = { | |
| sym: init_intraday_state_for_symbol() | |
| for sym in tickers | |
| } | |
| logging.info("Starting VIX watchers...") | |
| startup_session_date = get_reference_session_date() | |
| startup_catchup_pending: Dict[str, bool] = {sym: True for sym in tickers} | |
| for sym in tickers: | |
| session_date_dict[sym] = startup_session_date | |
| previous_values[sym] = None | |
| last_data_timestamps[sym] = None | |
| idle_heartbeat_log_times[sym] = 0.0 | |
| idle_duplicate_poll_counts[sym] = 0 | |
| last_new_data_epochs[sym] = 0.0 | |
| try: | |
| while True: | |
| all_ready = True | |
| for sym in tickers: | |
| if sym in baseline_dict: | |
| continue | |
| ok = refresh_baseline_for_symbol( | |
| sym, | |
| reference_date=startup_session_date, | |
| force=True, | |
| ) | |
| if not ok: | |
| all_ready = False | |
| if all_ready: | |
| reset_rate_limit_backoff() | |
| break | |
| wait_s = increase_rate_limit_backoff() | |
| logging.warning( | |
| f"Startup baseline initialization incomplete or rate-limited. Retrying in {wait_s}s..." | |
| ) | |
| time.sleep(wait_s) | |
| except KeyboardInterrupt: | |
| logging.info("Interrupted by user during startup. Exiting.") | |
| raise SystemExit(0) | |
| if TTS_SYSTEM_TEST and TTS_ENABLED: | |
| speak("Volatility index monitoring started.") | |
| ################################################################################ | |
| # MAIN LOOP | |
| ################################################################################ | |
| while True: | |
| try: | |
| try: | |
| data = yf.download( | |
| tickers, | |
| period="1d", | |
| interval=YF_INTERVAL_STRING, | |
| progress=False, | |
| auto_adjust=False, | |
| threads=False, | |
| ) | |
| reset_rate_limit_backoff() | |
| except YFRateLimitError: | |
| wait_s = increase_rate_limit_backoff() | |
| logging.warning( | |
| f"Yahoo rate-limited intraday download. Backing off for {wait_s}s." | |
| ) | |
| time.sleep(wait_s) | |
| continue | |
| if data.empty: | |
| logging.warning( | |
| f"No data returned for tickers using interval {YF_INTERVAL_STRING}. Retrying..." | |
| ) | |
| time.sleep(POLLING_INTERVAL) | |
| continue | |
| for sym in tickers: | |
| current_value, current_timestamp = get_latest_close_and_timestamp(data, sym) | |
| if current_value is None or current_timestamp is None or math.isnan(current_value): | |
| fallback = previous_values.get(sym) | |
| if fallback is not None and not math.isnan(fallback): | |
| logging.warning( | |
| f"{sym}: No new usable data in last fetch " | |
| f"(interval: {YF_INTERVAL_STRING}). Using fallback: {fallback:.2f}" | |
| ) | |
| current_value = fallback | |
| current_timestamp = last_data_timestamps.get(sym) or "unknown" | |
| else: | |
| logging.warning(f"{sym}: No data and no prior fallback. Skipping.") | |
| continue | |
| if ( | |
| SUPPRESS_DUPLICATE_TIMESTAMPS | |
| and current_timestamp == last_data_timestamps.get(sym) | |
| ): | |
| idle_duplicate_poll_counts[sym] += 1 | |
| maybe_log_idle_heartbeat( | |
| sym=sym, | |
| current_timestamp=current_timestamp, | |
| current_value=current_value, | |
| ) | |
| continue | |
| current_session_date = get_reference_session_date(current_timestamp) | |
| if current_session_date != session_date_dict.get(sym): | |
| refreshed = refresh_baseline_for_symbol( | |
| sym, | |
| reference_date=current_session_date, | |
| force=False, | |
| ) | |
| if refreshed: | |
| session_date_dict[sym] = current_session_date | |
| intraday_extremes[sym] = init_intraday_state_for_symbol() | |
| absolute_level_alerts[sym].clear() | |
| previous_values[sym] = None | |
| last_data_timestamps[sym] = None | |
| idle_heartbeat_log_times[sym] = 0.0 | |
| idle_duplicate_poll_counts[sym] = 0 | |
| last_new_data_epochs[sym] = 0.0 | |
| startup_catchup_pending[sym] = True | |
| else: | |
| logging.warning( | |
| f"{sym}: Failed to refresh baseline for new session date " | |
| f"{current_session_date.isoformat()}. Skipping this poll and will retry next poll." | |
| ) | |
| continue | |
| prev_value = previous_values.get(sym) | |
| if prev_value is None or math.isnan(prev_value): | |
| difference = 0.0 | |
| pct_change = 0.0 | |
| arrow = VIX_FLAT_MARKER | |
| is_bootstrap_sample = True | |
| else: | |
| difference = current_value - prev_value | |
| pct_change = (difference / prev_value * 100.0) if prev_value else 0.0 | |
| arrow = get_vix_direction_marker(difference) | |
| is_bootstrap_sample = False | |
| if sym not in baseline_dict: | |
| logging.warning( | |
| f"{sym}: baseline not available yet. Skipping analysis until baseline fetch succeeds." | |
| ) | |
| continue | |
| baseline = baseline_dict[sym] | |
| baseline_pct = safe_pct_from_baseline( | |
| current_value, | |
| baseline, | |
| sym, | |
| "baseline_pct", | |
| ) | |
| sev_emoji, sev_label = severity_for_absolute_vix(current_value) | |
| log_vix_snapshot( | |
| sym=sym, | |
| current_value=current_value, | |
| arrow=arrow, | |
| difference=difference, | |
| pct_change=pct_change, | |
| baseline=baseline, | |
| baseline_pct=baseline_pct, | |
| sev_emoji=sev_emoji, | |
| sev_label=sev_label, | |
| current_timestamp=current_timestamp, | |
| is_bootstrap_sample=is_bootstrap_sample, | |
| ) | |
| update_intraday_extremes_from_data(data, sym) | |
| check_and_announce_absolute_vix_levels( | |
| sym, | |
| current_value, | |
| startup_catchup=startup_catchup_pending.get(sym, False), | |
| ) | |
| check_and_announce_vix_record_break(sym, current_value) | |
| process_alert_resets( | |
| sym=sym, | |
| current_value=current_value, | |
| alert_levels=alert_levels, | |
| triggered_alerts=triggered_alerts, | |
| ) | |
| process_absolute_vix_level_resets(sym, current_value) | |
| check_and_announce_spike_thresholds( | |
| sym, | |
| current_value, | |
| startup_catchup=startup_catchup_pending.get(sym, False), | |
| ) | |
| idle_duplicate_poll_counts[sym] = 0 | |
| last_new_data_epochs[sym] = time.time() | |
| # idle_heartbeat_log_times[sym] = last_new_data_epochs[sym] | |
| idle_heartbeat_log_times[sym] = 0.0 | |
| previous_values[sym] = current_value | |
| last_data_timestamps[sym] = current_timestamp | |
| startup_catchup_pending[sym] = False | |
| time.sleep(POLLING_INTERVAL) | |
| except KeyboardInterrupt: | |
| logging.info("Interrupted by user. Exiting.") | |
| break | |
| except Exception: | |
| logging.exception("Unhandled error in main loop") | |
| time.sleep(POLLING_INTERVAL) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment