|
import argparse |
|
import shutil |
|
import time |
|
from datetime import date, datetime, timedelta |
|
|
|
import pandas as pd |
|
import yfinance as yf |
|
|
|
ETF_0050 = [ |
|
2330, 2308, 2317, 2454, 3711, 2891, 2383, 2345, 2382, 2881, |
|
2882, 2303, 3017, 2360, 2887, 2412, 2884, 2885, 2886, 2327, |
|
2890, 2357, 3231, 6669, 1303, 1216, 2344, 2368, 3653, 2449, |
|
2883, 2880, 2892, 2301, 2408, 3661, 5880, 7769, 2603, 2002, |
|
3008, 1301, 2059, 4904, 2395, 3045, 2207, 6919, 6505 |
|
] |
|
|
|
ETF_0051 = [ |
|
3037, 3665, 2379, 6770, 2313, 3034, 3036, 3443, 6515, 3481, |
|
6446, 3533, 3044, 5871, 1101, 2404, 2801, 5876, 4958, 4938, |
|
1326, 2324, 1590, 3702, 6239, 1519, 2376, 8046, 2912, 3189, |
|
2356, 2615, 2834, 6139, 6442, 6805, 2347, 2618, 1605, 2609, |
|
6415, 1504, 2409, 2474, 1102, 1402, 2353, 2812, 2027, 2385, |
|
3706, 1476, 2049, 8464, 9904, 8996, 2377, 1513, 3023, 1477, |
|
8210, 6285, 2105, 5434, 6531, 2610, 2633, 6409, 1802, 3005, |
|
2354, 2451, 1503, 2542, 7750, 5269, 1229, 6176, 9945, 2371, |
|
9910, 2006, 6789, 6472, 6781, 1319, 2845, 2915, 1795, 2646, |
|
6191, 7799, 2838, 6526, 2645, 2539, 8454, 4583, 6890, 2258 |
|
] |
|
|
|
|
|
def get_indicators(symbol: str, as_of: date) -> dict | None: |
|
"""Fetch last 30 trading days up to as_of and return OHLCV + BB + MA indicators.""" |
|
end = datetime.combine(as_of, datetime.min.time()) + timedelta(days=1) |
|
start = end - timedelta(days=60) # fetch extra days to ensure 30 trading days |
|
|
|
ticker = yf.Ticker(symbol) |
|
df = ticker.history(start=start.strftime("%Y-%m-%d"), end=end.strftime("%Y-%m-%d")) |
|
|
|
if df.empty or len(df) < 20: |
|
return None |
|
|
|
df = df.tail(30) |
|
close = df["Close"] |
|
|
|
ma20 = close.rolling(20).mean() |
|
std20 = close.rolling(20).std() |
|
upper = ma20 + 2 * std20 |
|
lower = ma20 - 2 * std20 |
|
ma5 = close.rolling(5).mean() |
|
|
|
last = df.iloc[-1] |
|
return { |
|
"open": last["Open"], |
|
"high": last["High"], |
|
"low": last["Low"], |
|
"close": last["Close"], |
|
"volume": last["Volume"], |
|
"upper_bb": upper.iloc[-1], |
|
"lower_bb": lower.iloc[-1], |
|
"ma5": ma5.iloc[-1], |
|
"ma20": ma20.iloc[-1], |
|
} |
|
|
|
|
|
def scan_symbols(as_of: date) -> pd.DataFrame: |
|
"""Scan all ETF_0050 and ETF_0051 symbols and return a DataFrame of indicators.""" |
|
symbols = [str(s) for s in ETF_0050] + [str(s) for s in ETF_0051] |
|
total = len(symbols) |
|
rows = [] |
|
|
|
for i, sym in enumerate(symbols, 1): |
|
ticker_sym = f"{sym}.TW" |
|
print(f"\r[{i:>3}/{total}] Fetching {ticker_sym}...", end=" ", flush=True) |
|
result = get_indicators(ticker_sym, as_of) |
|
if result is None: |
|
status = "skipped" |
|
else: |
|
rows.append({"symbol": ticker_sym, **result}) |
|
status = "done" |
|
|
|
# fixed parts: "[{i}/{total}] Fetching {ticker_sym}... {status} [ bar ] pct%" |
|
term_width = shutil.get_terminal_size().columns |
|
prefix = f"[{i:>3}/{total}] Fetching {ticker_sym}... {status}" |
|
suffix_template = " [{}] {:5.1f}%" |
|
# reserve space: suffix without the bar fill |
|
reserved = len(suffix_template.format("", i / total * 100)) |
|
bar_width = max(4, term_width - len(prefix) - reserved) |
|
filled = round(bar_width * i / total) |
|
bar = "█" * filled + "░" * (bar_width - filled) |
|
line = prefix + suffix_template.format(bar, i / total * 100) |
|
print(f"\r{line}", end="", flush=True) |
|
|
|
if i < total: |
|
time.sleep(1) |
|
|
|
print() # move past the progress line |
|
|
|
return pd.DataFrame(rows, columns=[ |
|
"symbol", "open", "high", "low", "close", "volume", |
|
"upper_bb", "lower_bb", "ma5", "ma20", |
|
]) |
|
|
|
|
|
def score_and_print(df: pd.DataFrame) -> None: |
|
"""Score each stock and print those that match a scoring condition.""" |
|
print("\n--- Scored Stocks ---") |
|
print(f"{'Symbol':<12} {'Score':>5} {'Close':>10} {'Upper BB':>10} {'Lower BB':>10} {'MA5':>10} {'MA20':>10}") |
|
print("-" * 80) |
|
|
|
scored_rows = [] |
|
for _, row in df.iterrows(): |
|
close = row["close"] |
|
upper = row["upper_bb"] |
|
lower = row["lower_bb"] |
|
ma5 = row["ma5"] |
|
ma20 = row["ma20"] |
|
|
|
if upper > close > ma5 > ma20: |
|
score = 5 |
|
elif close > upper: |
|
score = 4 |
|
elif lower < close < ma5 < ma20: |
|
score = 3 |
|
elif close < lower: |
|
score = 2 |
|
else: |
|
continue |
|
|
|
scored_rows.append((score, row)) |
|
|
|
scored_rows.sort(key=lambda x: x[0], reverse=True) |
|
|
|
found = bool(scored_rows) |
|
for score, row in scored_rows: |
|
close = row["close"] |
|
upper = row["upper_bb"] |
|
lower = row["lower_bb"] |
|
ma5 = row["ma5"] |
|
ma20 = row["ma20"] |
|
print( |
|
f"{row['symbol']:<12} {score:>5} {close:>10.2f} {upper:>10.2f} " |
|
f"{lower:>10.2f} {ma5:>10.2f} {ma20:>10.2f}" |
|
) |
|
|
|
if not found: |
|
print("No stocks matched any scoring condition.") |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="BB + MA5 tracker for TWSE stocks") |
|
parser.add_argument( |
|
"--date", |
|
type=lambda s: datetime.strptime(s, "%Y-%m-%d").date(), |
|
default=date.today(), |
|
help="As-of date in YYYY-MM-DD format (default: today)", |
|
) |
|
args = parser.parse_args() |
|
|
|
print(f"Scanning as of {args.date}\n") |
|
df = scan_symbols(args.date) |
|
score_and_print(df) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |