Skip to content

Instantly share code, notes, and snippets.

@pligor
Created October 6, 2025 10:11
Show Gist options
  • Save pligor/ee0bb214e5f2e9f0b5a27ffaf4695e11 to your computer and use it in GitHub Desktop.
Save pligor/ee0bb214e5f2e9f0b5a27ffaf4695e11 to your computer and use it in GitHub Desktop.
🚨 Automated Economic Reset Monitor: Real-time recession risk assessment using FRED API data with tripwire alerts for market downturns
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import dataclasses
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Dict, List, Literal, Mapping, Optional, Sequence, TypedDict
ISODate = str # YYYY-MM-DD
FRED_API_BASE = "https://api.stlouisfed.org/fred"
# ---------------------------
# Series specs
# ---------------------------
@dataclass(frozen=True)
class SeriesSpec:
id: str
name: str
frequency_hint: Literal["daily", "monthly"]
description: str
SERIES_SPECS: Mapping[str, SeriesSpec] = {
"UNRATE": SeriesSpec("UNRATE", "Unemployment rate", "monthly", "US unemployment rate (%)"),
"NEW_ORDERS_PROXY": SeriesSpec("NEWORDER", "New Orders (proxy)", "monthly", "Manufacturers' New Orders: Nondefense Capital Goods ex-Aircraft"),
"HY_OAS": SeriesSpec("BAMLH0A0HYM2", "High Yield OAS", "daily", "ICE BofA US High Yield OAS (%)"),
"DGS10": SeriesSpec("DGS10", "10Y Treasury", "daily", "10-year constant maturity Treasury yield (%)"),
"DGS2": SeriesSpec("DGS2", "2Y Treasury", "daily", "2-year constant maturity Treasury yield (%)"),
"CPI": SeriesSpec("CPIAUCSL", "CPI (All items)", "monthly", "CPI index (1982-84=100)"),
"UMICH": SeriesSpec("UMCSENT", "UMich Consumer Sentiment", "monthly", "Consumer sentiment index"),
"SP500": SeriesSpec("SP500", "S&P 500", "daily", "S&P 500 index (close)"),
}
# ---------------------------
# Types
# ---------------------------
class FredObs(TypedDict):
date: ISODate
value: str
class FredError(TypedDict, total=False):
error_code: int
error_message: str
@dataclass(frozen=True)
class IndicatorValues:
dates: List[ISODate]
values: List[Optional[float]]
@dataclass(frozen=True)
class Indicators:
unrate: IndicatorValues
new_orders_proxy: IndicatorValues
hy_oas: IndicatorValues
dgs10: IndicatorValues
dgs2: IndicatorValues
cpi: IndicatorValues
umich: IndicatorValues
sp500: IndicatorValues
@dataclass(frozen=True)
class TripwireFlags:
hy_oas_spike: bool
unemp_3m_up_0p5: bool
orders_contraction: bool
curve_inverted: bool
@dataclass(frozen=True)
class SupportFlags:
cpi_yoy_downtrend: bool
curve_positive: bool
orders_expansion: bool
hy_oas_low: bool
Regime = Literal["HARD_RESET_RISK", "SOFT_RESET_BASE", "NO_RESET"]
@dataclass(frozen=True)
class RegimeAssessment:
regime: Regime
tripwires_on: List[str]
supports_on: List[str]
@dataclass(frozen=True)
class CallToAction:
headline: str
bullets: List[str]
@dataclass(frozen=True)
class MonitorReport:
as_of: ISODate
regime: RegimeAssessment
indicators_snapshot: Mapping[str, Optional[float]]
call_to_action: CallToAction
# ---------------------------
# Helpers
# ---------------------------
def iso_today() -> ISODate:
return datetime.now(timezone.utc).date().isoformat()
def months_back(n: int) -> ISODate:
d = datetime.now(timezone.utc).date()
year = d.year
month = d.month - n
while month <= 0:
month += 12
year -= 1
return f"{year:04d}-{month:02d}-01"
def moving_average(seq: List[float], window: int) -> List[float]:
out: List[float] = []
s = 0.0
for i, v in enumerate(seq):
s += v
if i + 1 >= window:
if i + 1 > window:
s -= seq[i - window]
out.append(s / float(window))
return out
def latest_value(v: IndicatorValues) -> Optional[float]:
for vv in reversed(v.values):
if vv is not None:
return vv
return None
# ---------------------------
# FRED fetch with diagnostics
# ---------------------------
def fred_observations_url(series_id: str, start: ISODate, end: ISODate, api_key: str) -> str:
params = {
"series_id": series_id,
"observation_start": start,
"observation_end": end,
"api_key": api_key,
"file_type": "json",
}
return f"{FRED_API_BASE}/series/observations?{urllib.parse.urlencode(params)}"
def http_get_json(url: str, *, user_agent: str, timeout_sec: int = 30) -> Dict[str, object]:
req = urllib.request.Request(url, headers={"User-Agent": user_agent})
try:
with urllib.request.urlopen(req, timeout=timeout_sec) as resp:
data = resp.read().decode("utf-8")
except urllib.error.HTTPError as e:
body = ""
try:
body = e.read().decode("utf-8")
except Exception:
body = ""
fred_err: Optional[Mapping[str, object]] = None
try:
obj = json.loads(body)
if isinstance(obj, dict):
fred_err = obj
else:
fred_err = None
except Exception:
fred_err = None
if fred_err and "error_message" in fred_err:
raise RuntimeError(f"HTTP {e.code} {e.reason} from FRED: {fred_err['error_message']} | url={url}") from None
raise RuntimeError(f"HTTP {e.code} {e.reason} from FRED | url={url} | body_snippet={body[:200]}") from None
except urllib.error.URLError as e:
raise RuntimeError(f"Network error: {e.reason} | url={url}") from None
try:
parsed = json.loads(data)
assert isinstance(parsed, dict)
return parsed
except Exception as ex:
raise RuntimeError(f"JSON parse error: {ex} | url={url} | data_snippet={data[:200]}") from None
def fetch_series_values(series_id: str, start: ISODate, end: ISODate, api_key: str, verbose: bool) -> IndicatorValues:
url = fred_observations_url(series_id, start, end, api_key)
if verbose:
print("GET", url)
data = http_get_json(url, user_agent="reset-monitor/1.3 (+https://fred.stlouisfed.org/docs/api/fred/)")
obs = data.get("observations", [])
dates: List[ISODate] = []
vals: List[Optional[float]] = []
if not isinstance(obs, list):
raise RuntimeError(f"Unexpected JSON for series_id={series_id}: no 'observations' list")
for o in obs:
if not isinstance(o, dict):
continue
d = str(o.get("date", ""))
raw = str(o.get("value", "."))
dates.append(d)
if raw in (".", ""):
vals.append(None)
else:
try:
vals.append(float(raw))
except ValueError:
vals.append(None)
return IndicatorValues(dates=dates, values=vals)
# ---------------------------
# Signals & assessment
# ---------------------------
def compute_orders_yoy_3mma(new_orders: IndicatorValues) -> Optional[float]:
vals = [x for x in new_orders.values if x is not None]
if len(vals) < 15:
return None
ma3 = moving_average(vals, 3)
if len(ma3) < 13:
return None
cur = ma3[-1]
prev = ma3[-13]
if prev == 0.0:
return None
return (cur / prev - 1.0) * 100.0
# Duplicate dataclass definitions removed — original definitions appear earlier in the file.
def compute_tripwires(ind: Indicators, orders_tripwire_thresh: float) -> TripwireFlags:
hy = [x for x in ind.hy_oas.values if x is not None]
hy_now = hy[-1] if hy else None
hy_low_3m = min(hy[-63:]) if len(hy) >= 63 else (min(hy) if hy else None)
hy_spike = (hy_now is not None and hy_low_3m is not None and (hy_now - hy_low_3m) >= 1.5)
un = [x for x in ind.unrate.values if x is not None]
un_3m = moving_average(un, 3) if len(un) >= 3 else []
un_ok = len(un_3m) >= 7
un_now = un_3m[-1] if un_ok else None
un_6mo = un_3m[-7] if un_ok else None
un_rise = (un_now is not None and un_6mo is not None and (un_now - un_6mo) >= 0.5)
orders_yoy = compute_orders_yoy_3mma(ind.new_orders_proxy)
orders_contr = (orders_yoy is not None and orders_yoy <= orders_tripwire_thresh)
d10 = latest_value(ind.dgs10)
d02 = latest_value(ind.dgs2)
curve_inv = False
if d10 is not None and d02 is not None:
curve_inv = (d10 - d02) < -0.25
return TripwireFlags(
hy_oas_spike=hy_spike,
unemp_3m_up_0p5=un_rise,
orders_contraction=orders_contr,
curve_inverted=curve_inv,
)
def compute_supports(ind: Indicators, orders_support_thresh: float) -> SupportFlags:
cpi = [x for x in ind.cpi.values if x is not None]
yoy: List[float] = []
if len(cpi) >= 13:
for i in range(12, len(cpi)):
cur = cpi[i]
prev = cpi[i - 12]
if prev and prev != 0.0:
yoy.append((cur / prev - 1.0) * 100.0)
cpi_down = False
if len(yoy) >= 3:
a, b, c = yoy[-3:]
cpi_down = (c <= b <= a)
d10 = latest_value(ind.dgs10)
d02 = latest_value(ind.dgs2)
curve_pos = False
if d10 is not None and d02 is not None:
curve_pos = (d10 - d02) > 0.25
orders_yoy = compute_orders_yoy_3mma(ind.new_orders_proxy)
orders_exp = (orders_yoy is not None and orders_yoy >= orders_support_thresh)
hy = [x for x in ind.hy_oas.values if x is not None]
hy_low = (len(hy) > 0 and hy[-1] <= 4.5)
return SupportFlags(
cpi_yoy_downtrend=cpi_down,
curve_positive=curve_pos,
orders_expansion=orders_exp,
hy_oas_low=hy_low,
)
def assess_regime(trip: TripwireFlags, sup: SupportFlags) -> RegimeAssessment:
trips: List[str] = []
if trip.hy_oas_spike: trips.append("HY_OAS_SPIKE")
if trip.unemp_3m_up_0p5: trips.append("UNEMP_3M_UP_0.5PP")
if trip.orders_contraction: trips.append("ORDERS_YoY_3mma<=threshold")
if trip.curve_inverted: trips.append("CURVE_INVERTED")
sups: List[str] = []
if sup.cpi_yoy_downtrend: sups.append("CPI_YoY_DOWNTREND")
if sup.curve_positive: sups.append("CURVE_POSITIVE")
if sup.orders_expansion: sups.append("ORDERS_YoY_3mma>=threshold")
if sup.hy_oas_low: sups.append("HY_OAS_LOW")
if len(trips) >= 2:
regime: Regime = "HARD_RESET_RISK"
elif len(sups) >= 3:
regime = "NO_RESET"
else:
regime = "SOFT_RESET_BASE"
return RegimeAssessment(regime=regime, tripwires_on=trips, supports_on=sups)
def cta_for(regime: Regime) -> CallToAction:
if regime == "HARD_RESET_RISK":
return CallToAction(
headline="Hard‑reset risk: tighten risk, raise resilience",
bullets=[
"Top up your cash buffer; avoid new leverage.",
"Cut position sizes in riskier strategies; prioritize quality assets.",
"Stage any new buys in small tranches; wait for stabilization signals.",
],
)
if regime == "NO_RESET":
return CallToAction(
headline="No reset: stay invested, avoid chasing",
bullets=[
"Keep core allocations; do not chase hot stories.",
"Trim outsized winners; maintain some dry powder.",
"If rates stay high, prefer quality balance sheets and near-term cash flows.",
],
)
return CallToAction(
headline="Soft reset: keep course, rebalance by rules",
bullets=[
"Continue scheduled investments; keep a reasonable cash buffer.",
"Rebalance if any sleeve drifts >5 percentage points.",
"Favor quality and reasonable valuations; limit concentration.",
],
)
# ---------------------------
# Collection & report
# ---------------------------
def fred_observations(series_id: str, start: ISODate, end: ISODate, api_key: str, verbose: bool) -> IndicatorValues:
url = fred_observations_url(series_id, start, end, api_key)
if verbose:
print("GET", url)
data = http_get_json(url, user_agent="reset-monitor/1.3 (+https://fred.stlouisfed.org/docs/api/fred/)")
obs = data.get("observations", [])
dates: List[ISODate] = []
vals: List[Optional[float]] = []
if not isinstance(obs, list):
raise RuntimeError(f"Unexpected JSON for series_id={series_id}: no 'observations' list")
for o in obs:
if not isinstance(o, dict):
continue
d = str(o.get("date", ""))
raw = str(o.get("value", "."))
dates.append(d)
if raw in (".", ""):
vals.append(None)
else:
try:
vals.append(float(raw))
except ValueError:
vals.append(None)
return IndicatorValues(dates=dates, values=vals)
def collect_indicators(api_key: str, since: ISODate, until: ISODate, verbose: bool) -> Indicators:
def grab(key: str) -> IndicatorValues:
spec = SERIES_SPECS[key]
try:
vals = fred_observations(spec.id, since, until, api_key, verbose)
if len(vals.values) == 0:
sys.stderr.write(f"[WARN] No observations for {spec.id}\n")
return vals
except RuntimeError as e:
msg = str(e)
if "api_key is not registered" in msg or "api key is not registered" in msg:
raise RuntimeError(
"FRED API rejected your key: 'api_key is not registered'.\n"
"Fix: sign up for a free key and pass it with --fred-key or set FRED_API_KEY.\n"
"Docs: https://fred.stlouisfed.org/docs/api/api_key.html"
)
sys.stderr.write(f"[WARN] {spec.id} failed: {msg}\n")
return IndicatorValues(dates=[since, until], values=[None, None])
return Indicators(
unrate=grab("UNRATE"),
new_orders_proxy=grab("NEW_ORDERS_PROXY"),
hy_oas=grab("HY_OAS"),
dgs10=grab("DGS10"),
dgs2=grab("DGS2"),
cpi=grab("CPI"),
umich=grab("UMICH"),
sp500=grab("SP500"),
)
def indicators_snapshot(ind: Indicators) -> Mapping[str, Optional[float]]:
dgs10_val = latest_value(ind.dgs10)
dgs2_val = latest_value(ind.dgs2)
yc: Optional[float] = None
if dgs10_val is not None and dgs2_val is not None:
yc = dgs10_val - dgs2_val
orders_yoy = compute_orders_yoy_3mma(ind.new_orders_proxy)
snap: Dict[str, Optional[float]] = {
"UNRATE": latest_value(ind.unrate),
"NEWORDER": latest_value(ind.new_orders_proxy),
"ORDERS_YoY_3mma": orders_yoy,
"HY_OAS": latest_value(ind.hy_oas),
"DGS10": dgs10_val,
"DGS2": dgs2_val,
"YC_SPREAD_10Y_2Y": yc,
"CPI_index": latest_value(ind.cpi),
"UMICH": latest_value(ind.umich),
"SP500": latest_value(ind.sp500),
}
return snap
def run_monitor(api_key: str, since: ISODate, verbose: bool, orders_tripwire_thresh: float, orders_support_thresh: float) -> MonitorReport:
today = iso_today()
ind = collect_indicators(api_key, since, today, verbose)
trips = compute_tripwires(ind, orders_tripwire_thresh)
sups = compute_supports(ind, orders_support_thresh)
regime = assess_regime(trips, sups)
cta = cta_for(regime.regime)
snap = indicators_snapshot(ind)
return MonitorReport(as_of=today, regime=regime, indicators_snapshot=snap, call_to_action=cta)
def pretty_print(report: MonitorReport) -> None:
print(f"=== Economic Reset Monitor (as of {report.as_of}) ===")
print(f"Regime: {report.regime.regime}")
print("Tripwires ON:", ", ".join(report.regime.tripwires_on) if report.regime.tripwires_on else "none")
print("Supports ON:", ", ".join(report.regime.supports_on) if report.regime.supports_on else "none")
print("\nSnapshot:")
for k, v in report.indicators_snapshot.items():
print(f" {k:>18}: {v if v is not None else 'n/a'}")
print("\nCall to Action:")
print(" ", report.call_to_action.headline)
for b in report.call_to_action.bullets:
print(" -", b)
def main(argv: Optional[Sequence[str]] = None) -> int:
p = argparse.ArgumentParser(description="Automated economic reset monitor using FRED public API")
p.add_argument("--fred-key", type=str, default=os.environ.get("FRED_API_KEY", ""), help="FRED API key (or set FRED_API_KEY)")
p.add_argument("--since", type=str, default=months_back(36), help="Observation start date (YYYY-MM-DD). Default: 36 months ago")
p.add_argument("--save", type=str, default="", help="Optional path to save JSON report")
p.add_argument("--verbose", action="store_true", help="Print request URLs and per-series warnings")
p.add_argument("--orders-tripwire", type=float, default=-5.0, help="Orders YoY 3mma threshold for contraction tripwire (default: -5.0)")
p.add_argument("--orders-support", type=float, default=3.0, help="Orders YoY 3mma threshold for expansion support (default: +3.0)")
args = p.parse_args(argv)
if not args.fred_key:
sys.stderr.write("ERROR: Set FRED_API_KEY env var or pass --fred-key YOURKEY\n")
return 2
try:
report = run_monitor(
api_key=args.fred_key,
since=args.since,
verbose=args.verbose,
orders_tripwire_thresh=args.orders_tripwire,
orders_support_thresh=args.orders_support,
)
except RuntimeError as e:
sys.stderr.write(f"Failed to run monitor: {e}\n")
return 1
pretty_print(report)
if args.save:
with open(args.save, "w", encoding="utf-8") as f:
json.dump(dataclasses.asdict(report), f, ensure_ascii=False, indent=2)
print(f"\nSaved report -> {args.save}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment