Created
October 6, 2025 10:11
-
-
Save pligor/ee0bb214e5f2e9f0b5a27ffaf4695e11 to your computer and use it in GitHub Desktop.
🚨 Automated Economic Reset Monitor: Real-time recession risk assessment using FRED API data with tripwire alerts for market downturns
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from __future__ import annotations | |
import argparse | |
import dataclasses | |
import json | |
import os | |
import sys | |
import urllib.error | |
import urllib.parse | |
import urllib.request | |
from dataclasses import dataclass | |
from datetime import datetime, timezone | |
from typing import Dict, List, Literal, Mapping, Optional, Sequence, TypedDict | |
ISODate = str # YYYY-MM-DD | |
FRED_API_BASE = "https://api.stlouisfed.org/fred" | |
# --------------------------- | |
# Series specs | |
# --------------------------- | |
@dataclass(frozen=True) | |
class SeriesSpec: | |
id: str | |
name: str | |
frequency_hint: Literal["daily", "monthly"] | |
description: str | |
SERIES_SPECS: Mapping[str, SeriesSpec] = { | |
"UNRATE": SeriesSpec("UNRATE", "Unemployment rate", "monthly", "US unemployment rate (%)"), | |
"NEW_ORDERS_PROXY": SeriesSpec("NEWORDER", "New Orders (proxy)", "monthly", "Manufacturers' New Orders: Nondefense Capital Goods ex-Aircraft"), | |
"HY_OAS": SeriesSpec("BAMLH0A0HYM2", "High Yield OAS", "daily", "ICE BofA US High Yield OAS (%)"), | |
"DGS10": SeriesSpec("DGS10", "10Y Treasury", "daily", "10-year constant maturity Treasury yield (%)"), | |
"DGS2": SeriesSpec("DGS2", "2Y Treasury", "daily", "2-year constant maturity Treasury yield (%)"), | |
"CPI": SeriesSpec("CPIAUCSL", "CPI (All items)", "monthly", "CPI index (1982-84=100)"), | |
"UMICH": SeriesSpec("UMCSENT", "UMich Consumer Sentiment", "monthly", "Consumer sentiment index"), | |
"SP500": SeriesSpec("SP500", "S&P 500", "daily", "S&P 500 index (close)"), | |
} | |
# --------------------------- | |
# Types | |
# --------------------------- | |
class FredObs(TypedDict): | |
date: ISODate | |
value: str | |
class FredError(TypedDict, total=False): | |
error_code: int | |
error_message: str | |
@dataclass(frozen=True) | |
class IndicatorValues: | |
dates: List[ISODate] | |
values: List[Optional[float]] | |
@dataclass(frozen=True) | |
class Indicators: | |
unrate: IndicatorValues | |
new_orders_proxy: IndicatorValues | |
hy_oas: IndicatorValues | |
dgs10: IndicatorValues | |
dgs2: IndicatorValues | |
cpi: IndicatorValues | |
umich: IndicatorValues | |
sp500: IndicatorValues | |
@dataclass(frozen=True) | |
class TripwireFlags: | |
hy_oas_spike: bool | |
unemp_3m_up_0p5: bool | |
orders_contraction: bool | |
curve_inverted: bool | |
@dataclass(frozen=True) | |
class SupportFlags: | |
cpi_yoy_downtrend: bool | |
curve_positive: bool | |
orders_expansion: bool | |
hy_oas_low: bool | |
Regime = Literal["HARD_RESET_RISK", "SOFT_RESET_BASE", "NO_RESET"] | |
@dataclass(frozen=True) | |
class RegimeAssessment: | |
regime: Regime | |
tripwires_on: List[str] | |
supports_on: List[str] | |
@dataclass(frozen=True) | |
class CallToAction: | |
headline: str | |
bullets: List[str] | |
@dataclass(frozen=True) | |
class MonitorReport: | |
as_of: ISODate | |
regime: RegimeAssessment | |
indicators_snapshot: Mapping[str, Optional[float]] | |
call_to_action: CallToAction | |
# --------------------------- | |
# Helpers | |
# --------------------------- | |
def iso_today() -> ISODate: | |
return datetime.now(timezone.utc).date().isoformat() | |
def months_back(n: int) -> ISODate: | |
d = datetime.now(timezone.utc).date() | |
year = d.year | |
month = d.month - n | |
while month <= 0: | |
month += 12 | |
year -= 1 | |
return f"{year:04d}-{month:02d}-01" | |
def moving_average(seq: List[float], window: int) -> List[float]: | |
out: List[float] = [] | |
s = 0.0 | |
for i, v in enumerate(seq): | |
s += v | |
if i + 1 >= window: | |
if i + 1 > window: | |
s -= seq[i - window] | |
out.append(s / float(window)) | |
return out | |
def latest_value(v: IndicatorValues) -> Optional[float]: | |
for vv in reversed(v.values): | |
if vv is not None: | |
return vv | |
return None | |
# --------------------------- | |
# FRED fetch with diagnostics | |
# --------------------------- | |
def fred_observations_url(series_id: str, start: ISODate, end: ISODate, api_key: str) -> str: | |
params = { | |
"series_id": series_id, | |
"observation_start": start, | |
"observation_end": end, | |
"api_key": api_key, | |
"file_type": "json", | |
} | |
return f"{FRED_API_BASE}/series/observations?{urllib.parse.urlencode(params)}" | |
def http_get_json(url: str, *, user_agent: str, timeout_sec: int = 30) -> Dict[str, object]: | |
req = urllib.request.Request(url, headers={"User-Agent": user_agent}) | |
try: | |
with urllib.request.urlopen(req, timeout=timeout_sec) as resp: | |
data = resp.read().decode("utf-8") | |
except urllib.error.HTTPError as e: | |
body = "" | |
try: | |
body = e.read().decode("utf-8") | |
except Exception: | |
body = "" | |
fred_err: Optional[Mapping[str, object]] = None | |
try: | |
obj = json.loads(body) | |
if isinstance(obj, dict): | |
fred_err = obj | |
else: | |
fred_err = None | |
except Exception: | |
fred_err = None | |
if fred_err and "error_message" in fred_err: | |
raise RuntimeError(f"HTTP {e.code} {e.reason} from FRED: {fred_err['error_message']} | url={url}") from None | |
raise RuntimeError(f"HTTP {e.code} {e.reason} from FRED | url={url} | body_snippet={body[:200]}") from None | |
except urllib.error.URLError as e: | |
raise RuntimeError(f"Network error: {e.reason} | url={url}") from None | |
try: | |
parsed = json.loads(data) | |
assert isinstance(parsed, dict) | |
return parsed | |
except Exception as ex: | |
raise RuntimeError(f"JSON parse error: {ex} | url={url} | data_snippet={data[:200]}") from None | |
def fetch_series_values(series_id: str, start: ISODate, end: ISODate, api_key: str, verbose: bool) -> IndicatorValues: | |
url = fred_observations_url(series_id, start, end, api_key) | |
if verbose: | |
print("GET", url) | |
data = http_get_json(url, user_agent="reset-monitor/1.3 (+https://fred.stlouisfed.org/docs/api/fred/)") | |
obs = data.get("observations", []) | |
dates: List[ISODate] = [] | |
vals: List[Optional[float]] = [] | |
if not isinstance(obs, list): | |
raise RuntimeError(f"Unexpected JSON for series_id={series_id}: no 'observations' list") | |
for o in obs: | |
if not isinstance(o, dict): | |
continue | |
d = str(o.get("date", "")) | |
raw = str(o.get("value", ".")) | |
dates.append(d) | |
if raw in (".", ""): | |
vals.append(None) | |
else: | |
try: | |
vals.append(float(raw)) | |
except ValueError: | |
vals.append(None) | |
return IndicatorValues(dates=dates, values=vals) | |
# --------------------------- | |
# Signals & assessment | |
# --------------------------- | |
def compute_orders_yoy_3mma(new_orders: IndicatorValues) -> Optional[float]: | |
vals = [x for x in new_orders.values if x is not None] | |
if len(vals) < 15: | |
return None | |
ma3 = moving_average(vals, 3) | |
if len(ma3) < 13: | |
return None | |
cur = ma3[-1] | |
prev = ma3[-13] | |
if prev == 0.0: | |
return None | |
return (cur / prev - 1.0) * 100.0 | |
# Duplicate dataclass definitions removed — original definitions appear earlier in the file. | |
def compute_tripwires(ind: Indicators, orders_tripwire_thresh: float) -> TripwireFlags: | |
hy = [x for x in ind.hy_oas.values if x is not None] | |
hy_now = hy[-1] if hy else None | |
hy_low_3m = min(hy[-63:]) if len(hy) >= 63 else (min(hy) if hy else None) | |
hy_spike = (hy_now is not None and hy_low_3m is not None and (hy_now - hy_low_3m) >= 1.5) | |
un = [x for x in ind.unrate.values if x is not None] | |
un_3m = moving_average(un, 3) if len(un) >= 3 else [] | |
un_ok = len(un_3m) >= 7 | |
un_now = un_3m[-1] if un_ok else None | |
un_6mo = un_3m[-7] if un_ok else None | |
un_rise = (un_now is not None and un_6mo is not None and (un_now - un_6mo) >= 0.5) | |
orders_yoy = compute_orders_yoy_3mma(ind.new_orders_proxy) | |
orders_contr = (orders_yoy is not None and orders_yoy <= orders_tripwire_thresh) | |
d10 = latest_value(ind.dgs10) | |
d02 = latest_value(ind.dgs2) | |
curve_inv = False | |
if d10 is not None and d02 is not None: | |
curve_inv = (d10 - d02) < -0.25 | |
return TripwireFlags( | |
hy_oas_spike=hy_spike, | |
unemp_3m_up_0p5=un_rise, | |
orders_contraction=orders_contr, | |
curve_inverted=curve_inv, | |
) | |
def compute_supports(ind: Indicators, orders_support_thresh: float) -> SupportFlags: | |
cpi = [x for x in ind.cpi.values if x is not None] | |
yoy: List[float] = [] | |
if len(cpi) >= 13: | |
for i in range(12, len(cpi)): | |
cur = cpi[i] | |
prev = cpi[i - 12] | |
if prev and prev != 0.0: | |
yoy.append((cur / prev - 1.0) * 100.0) | |
cpi_down = False | |
if len(yoy) >= 3: | |
a, b, c = yoy[-3:] | |
cpi_down = (c <= b <= a) | |
d10 = latest_value(ind.dgs10) | |
d02 = latest_value(ind.dgs2) | |
curve_pos = False | |
if d10 is not None and d02 is not None: | |
curve_pos = (d10 - d02) > 0.25 | |
orders_yoy = compute_orders_yoy_3mma(ind.new_orders_proxy) | |
orders_exp = (orders_yoy is not None and orders_yoy >= orders_support_thresh) | |
hy = [x for x in ind.hy_oas.values if x is not None] | |
hy_low = (len(hy) > 0 and hy[-1] <= 4.5) | |
return SupportFlags( | |
cpi_yoy_downtrend=cpi_down, | |
curve_positive=curve_pos, | |
orders_expansion=orders_exp, | |
hy_oas_low=hy_low, | |
) | |
def assess_regime(trip: TripwireFlags, sup: SupportFlags) -> RegimeAssessment: | |
trips: List[str] = [] | |
if trip.hy_oas_spike: trips.append("HY_OAS_SPIKE") | |
if trip.unemp_3m_up_0p5: trips.append("UNEMP_3M_UP_0.5PP") | |
if trip.orders_contraction: trips.append("ORDERS_YoY_3mma<=threshold") | |
if trip.curve_inverted: trips.append("CURVE_INVERTED") | |
sups: List[str] = [] | |
if sup.cpi_yoy_downtrend: sups.append("CPI_YoY_DOWNTREND") | |
if sup.curve_positive: sups.append("CURVE_POSITIVE") | |
if sup.orders_expansion: sups.append("ORDERS_YoY_3mma>=threshold") | |
if sup.hy_oas_low: sups.append("HY_OAS_LOW") | |
if len(trips) >= 2: | |
regime: Regime = "HARD_RESET_RISK" | |
elif len(sups) >= 3: | |
regime = "NO_RESET" | |
else: | |
regime = "SOFT_RESET_BASE" | |
return RegimeAssessment(regime=regime, tripwires_on=trips, supports_on=sups) | |
def cta_for(regime: Regime) -> CallToAction: | |
if regime == "HARD_RESET_RISK": | |
return CallToAction( | |
headline="Hard‑reset risk: tighten risk, raise resilience", | |
bullets=[ | |
"Top up your cash buffer; avoid new leverage.", | |
"Cut position sizes in riskier strategies; prioritize quality assets.", | |
"Stage any new buys in small tranches; wait for stabilization signals.", | |
], | |
) | |
if regime == "NO_RESET": | |
return CallToAction( | |
headline="No reset: stay invested, avoid chasing", | |
bullets=[ | |
"Keep core allocations; do not chase hot stories.", | |
"Trim outsized winners; maintain some dry powder.", | |
"If rates stay high, prefer quality balance sheets and near-term cash flows.", | |
], | |
) | |
return CallToAction( | |
headline="Soft reset: keep course, rebalance by rules", | |
bullets=[ | |
"Continue scheduled investments; keep a reasonable cash buffer.", | |
"Rebalance if any sleeve drifts >5 percentage points.", | |
"Favor quality and reasonable valuations; limit concentration.", | |
], | |
) | |
# --------------------------- | |
# Collection & report | |
# --------------------------- | |
def fred_observations(series_id: str, start: ISODate, end: ISODate, api_key: str, verbose: bool) -> IndicatorValues: | |
url = fred_observations_url(series_id, start, end, api_key) | |
if verbose: | |
print("GET", url) | |
data = http_get_json(url, user_agent="reset-monitor/1.3 (+https://fred.stlouisfed.org/docs/api/fred/)") | |
obs = data.get("observations", []) | |
dates: List[ISODate] = [] | |
vals: List[Optional[float]] = [] | |
if not isinstance(obs, list): | |
raise RuntimeError(f"Unexpected JSON for series_id={series_id}: no 'observations' list") | |
for o in obs: | |
if not isinstance(o, dict): | |
continue | |
d = str(o.get("date", "")) | |
raw = str(o.get("value", ".")) | |
dates.append(d) | |
if raw in (".", ""): | |
vals.append(None) | |
else: | |
try: | |
vals.append(float(raw)) | |
except ValueError: | |
vals.append(None) | |
return IndicatorValues(dates=dates, values=vals) | |
def collect_indicators(api_key: str, since: ISODate, until: ISODate, verbose: bool) -> Indicators: | |
def grab(key: str) -> IndicatorValues: | |
spec = SERIES_SPECS[key] | |
try: | |
vals = fred_observations(spec.id, since, until, api_key, verbose) | |
if len(vals.values) == 0: | |
sys.stderr.write(f"[WARN] No observations for {spec.id}\n") | |
return vals | |
except RuntimeError as e: | |
msg = str(e) | |
if "api_key is not registered" in msg or "api key is not registered" in msg: | |
raise RuntimeError( | |
"FRED API rejected your key: 'api_key is not registered'.\n" | |
"Fix: sign up for a free key and pass it with --fred-key or set FRED_API_KEY.\n" | |
"Docs: https://fred.stlouisfed.org/docs/api/api_key.html" | |
) | |
sys.stderr.write(f"[WARN] {spec.id} failed: {msg}\n") | |
return IndicatorValues(dates=[since, until], values=[None, None]) | |
return Indicators( | |
unrate=grab("UNRATE"), | |
new_orders_proxy=grab("NEW_ORDERS_PROXY"), | |
hy_oas=grab("HY_OAS"), | |
dgs10=grab("DGS10"), | |
dgs2=grab("DGS2"), | |
cpi=grab("CPI"), | |
umich=grab("UMICH"), | |
sp500=grab("SP500"), | |
) | |
def indicators_snapshot(ind: Indicators) -> Mapping[str, Optional[float]]: | |
dgs10_val = latest_value(ind.dgs10) | |
dgs2_val = latest_value(ind.dgs2) | |
yc: Optional[float] = None | |
if dgs10_val is not None and dgs2_val is not None: | |
yc = dgs10_val - dgs2_val | |
orders_yoy = compute_orders_yoy_3mma(ind.new_orders_proxy) | |
snap: Dict[str, Optional[float]] = { | |
"UNRATE": latest_value(ind.unrate), | |
"NEWORDER": latest_value(ind.new_orders_proxy), | |
"ORDERS_YoY_3mma": orders_yoy, | |
"HY_OAS": latest_value(ind.hy_oas), | |
"DGS10": dgs10_val, | |
"DGS2": dgs2_val, | |
"YC_SPREAD_10Y_2Y": yc, | |
"CPI_index": latest_value(ind.cpi), | |
"UMICH": latest_value(ind.umich), | |
"SP500": latest_value(ind.sp500), | |
} | |
return snap | |
def run_monitor(api_key: str, since: ISODate, verbose: bool, orders_tripwire_thresh: float, orders_support_thresh: float) -> MonitorReport: | |
today = iso_today() | |
ind = collect_indicators(api_key, since, today, verbose) | |
trips = compute_tripwires(ind, orders_tripwire_thresh) | |
sups = compute_supports(ind, orders_support_thresh) | |
regime = assess_regime(trips, sups) | |
cta = cta_for(regime.regime) | |
snap = indicators_snapshot(ind) | |
return MonitorReport(as_of=today, regime=regime, indicators_snapshot=snap, call_to_action=cta) | |
def pretty_print(report: MonitorReport) -> None: | |
print(f"=== Economic Reset Monitor (as of {report.as_of}) ===") | |
print(f"Regime: {report.regime.regime}") | |
print("Tripwires ON:", ", ".join(report.regime.tripwires_on) if report.regime.tripwires_on else "none") | |
print("Supports ON:", ", ".join(report.regime.supports_on) if report.regime.supports_on else "none") | |
print("\nSnapshot:") | |
for k, v in report.indicators_snapshot.items(): | |
print(f" {k:>18}: {v if v is not None else 'n/a'}") | |
print("\nCall to Action:") | |
print(" ", report.call_to_action.headline) | |
for b in report.call_to_action.bullets: | |
print(" -", b) | |
def main(argv: Optional[Sequence[str]] = None) -> int: | |
p = argparse.ArgumentParser(description="Automated economic reset monitor using FRED public API") | |
p.add_argument("--fred-key", type=str, default=os.environ.get("FRED_API_KEY", ""), help="FRED API key (or set FRED_API_KEY)") | |
p.add_argument("--since", type=str, default=months_back(36), help="Observation start date (YYYY-MM-DD). Default: 36 months ago") | |
p.add_argument("--save", type=str, default="", help="Optional path to save JSON report") | |
p.add_argument("--verbose", action="store_true", help="Print request URLs and per-series warnings") | |
p.add_argument("--orders-tripwire", type=float, default=-5.0, help="Orders YoY 3mma threshold for contraction tripwire (default: -5.0)") | |
p.add_argument("--orders-support", type=float, default=3.0, help="Orders YoY 3mma threshold for expansion support (default: +3.0)") | |
args = p.parse_args(argv) | |
if not args.fred_key: | |
sys.stderr.write("ERROR: Set FRED_API_KEY env var or pass --fred-key YOURKEY\n") | |
return 2 | |
try: | |
report = run_monitor( | |
api_key=args.fred_key, | |
since=args.since, | |
verbose=args.verbose, | |
orders_tripwire_thresh=args.orders_tripwire, | |
orders_support_thresh=args.orders_support, | |
) | |
except RuntimeError as e: | |
sys.stderr.write(f"Failed to run monitor: {e}\n") | |
return 1 | |
pretty_print(report) | |
if args.save: | |
with open(args.save, "w", encoding="utf-8") as f: | |
json.dump(dataclasses.asdict(report), f, ensure_ascii=False, indent=2) | |
print(f"\nSaved report -> {args.save}") | |
return 0 | |
if __name__ == "__main__": | |
raise SystemExit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment