Created
March 20, 2026 07:34
-
-
Save duyixian1234/a565f309043a8b888ed6b47faa115066 to your computer and use it in GitHub Desktop.
rss_fetch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # /// script | |
| # dependencies = [ | |
| # "feedparser", | |
| # ] | |
| # /// | |
| """ | |
| RSS 订阅抓取脚本 | |
| 脚本无需接收命令行参数,会并发抓取 DEFAULT_FEEDS 中最近一周的帖子, | |
| 并将所有帖子持久化到工作区根目录下的 SQLite 数据库 `rss_feeds.db`。 | |
| 首次发现的新帖子会额外按发布日期保存到 `feeds/feeds-YYYY-MM-DD.md`。 | |
| 帖子 URL 作为数据库唯一键,重复运行不会重复入库,也不会重复写入 Markdown。 | |
| """ | |
| import sqlite3 | |
| from collections import defaultdict | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import date, datetime, timedelta, timezone | |
| from pathlib import Path | |
| from typing import Any, Dict, Iterable, List | |
| import feedparser | |
| # 默认 RSS 订阅列表 | |
| DEFAULT_FEEDS = [ | |
| "https://rsshub.rssforever.com/readhub/daily","https://www.qbitai.com/feed","https://www.oschina.net/news/rss" | |
| ] | |
| WORKSPACE_ROOT = Path(__file__).resolve().parent.parent | |
| DB_PATH = WORKSPACE_ROOT / "rss_feeds.db" | |
| FEEDS_DIR = WORKSPACE_ROOT / "feeds" | |
| RECENT_DAYS = 7 | |
| def fetch_rss_feed(url: str) -> feedparser.FeedParserDict: | |
| """获取并解析 RSS 订阅""" | |
| feed = feedparser.parse(url) | |
| if feed.bozo != 0: | |
| raise Exception(f"解析 RSS 失败: {feed.bozo_exception}") | |
| return feed | |
| def get_entry_date(entry: feedparser.FeedParserDict) -> datetime: | |
| """从 RSS 条目获取发布日期""" | |
| if hasattr(entry, "published_parsed") and entry.published_parsed: | |
| return datetime(*entry.published_parsed[:6]) | |
| elif hasattr(entry, "updated_parsed") and entry.updated_parsed: | |
| return datetime(*entry.updated_parsed[:6]) | |
| raise ValueError("无法找到条目日期") | |
| def filter_entries_by_date( | |
| entries: List[feedparser.FeedParserDict], | |
| target_dates: List[str] | |
| ) -> List[Dict[str, Any]]: | |
| """根据指定日期筛选条目""" | |
| target_date_objs = [ | |
| datetime.strptime(date_str, "%Y-%m-%d").date() | |
| for date_str in target_dates | |
| ] | |
| filtered = [] | |
| for entry in entries: | |
| try: | |
| entry_date = get_entry_date(entry).date() | |
| if entry_date in target_date_objs: | |
| filtered.append({ | |
| "title": entry.get("title", "无标题"), | |
| "link": entry.get("link", ""), | |
| "published": get_entry_date(entry).strftime("%Y-%m-%d %H:%M:%S"), | |
| "published_date": entry_date.strftime("%Y-%m-%d"), | |
| }) | |
| except (ValueError, AttributeError): | |
| continue | |
| return filtered | |
| def get_recent_dates(days: int = RECENT_DAYS) -> List[str]: | |
| """获取最近 N 天(含今天)的日期字符串列表。""" | |
| today = date.today() | |
| return [ | |
| (today - timedelta(days=offset)).strftime("%Y-%m-%d") | |
| for offset in range(days) | |
| ] | |
| def init_database() -> None: | |
| """初始化 SQLite 数据库。""" | |
| DB_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| with sqlite3.connect(DB_PATH) as connection: | |
| connection.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS posts ( | |
| url TEXT PRIMARY KEY, | |
| title TEXT NOT NULL, | |
| published TEXT NOT NULL, | |
| published_date TEXT NOT NULL, | |
| feed_title TEXT NOT NULL, | |
| feed_url TEXT NOT NULL, | |
| first_seen_at TEXT NOT NULL | |
| ) | |
| """ | |
| ) | |
| connection.commit() | |
| def persist_entries(entries: Iterable[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """将帖子持久化到数据库,返回本次新增的帖子。""" | |
| new_entries: List[Dict[str, Any]] = [] | |
| first_seen_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S%z") | |
| with sqlite3.connect(DB_PATH) as connection: | |
| for entry in entries: | |
| cursor = connection.execute( | |
| """ | |
| INSERT OR IGNORE INTO posts ( | |
| url, | |
| title, | |
| published, | |
| published_date, | |
| feed_title, | |
| feed_url, | |
| first_seen_at | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?) | |
| """, | |
| ( | |
| entry["link"], | |
| entry["title"], | |
| entry["published"], | |
| entry["published_date"], | |
| entry["feed_title"], | |
| entry["feed_url"], | |
| first_seen_at, | |
| ), | |
| ) | |
| if cursor.rowcount == 1: | |
| new_entries.append(entry) | |
| connection.commit() | |
| return new_entries | |
| def write_new_entries_to_markdown(entries: Iterable[Dict[str, Any]]) -> List[Path]: | |
| """将新增帖子按日期写入 Markdown 文件。""" | |
| FEEDS_DIR.mkdir(parents=True, exist_ok=True) | |
| grouped_entries: Dict[str, List[Dict[str, Any]]] = defaultdict(list) | |
| for entry in entries: | |
| grouped_entries[entry["published_date"]].append(entry) | |
| written_files: List[Path] = [] | |
| for published_date, dated_entries in sorted(grouped_entries.items()): | |
| file_path = FEEDS_DIR / f"feeds-{published_date}.md" | |
| file_exists = file_path.exists() | |
| with file_path.open("a", encoding="utf-8") as file: | |
| if not file_exists: | |
| file.write(f"# Feeds for {published_date}\n\n") | |
| for entry in dated_entries: | |
| file.write(f"- [{entry['title']}]({entry['link']})\n") | |
| file.write(f" - 来源: {entry['feed_title']}\n") | |
| file.write(f" - Feed: {entry['feed_url']}\n") | |
| file.write(f" - 发布时间: {entry['published']}\n\n") | |
| written_files.append(file_path) | |
| return written_files | |
| def print_summary( | |
| total_entries: int, | |
| new_entries: List[Dict[str, Any]], | |
| written_files: List[Path], | |
| failures: List[Dict[str, Any]], | |
| target_dates: List[str], | |
| ) -> None: | |
| """输出执行摘要。""" | |
| print(f"抓取日期范围: {target_dates[-1]} 至 {target_dates[0]}") | |
| print(f"扫描到的帖子总数: {total_entries}") | |
| print(f"新增入库帖子数: {len(new_entries)}") | |
| if written_files: | |
| print("已更新 Markdown 文件:") | |
| for file_path in written_files: | |
| print(f"- {file_path.relative_to(WORKSPACE_ROOT)}") | |
| else: | |
| print("没有新的帖子需要写入 Markdown。") | |
| if failures: | |
| print("抓取失败的订阅:") | |
| for failure in failures: | |
| print(f"- {failure['title']}: {failure.get('error', '未知错误')}") | |
| def fetch_and_filter_feed(url: str, target_dates: List[str]) -> Dict[str, Any]: | |
| """获取单个 RSS 订阅并筛选条目""" | |
| try: | |
| feed = fetch_rss_feed(url) | |
| feed_title = feed.feed.get("title", url) | |
| filtered_entries = [ | |
| { | |
| **entry, | |
| "feed_title": feed_title, | |
| "feed_url": url, | |
| } | |
| for entry in filter_entries_by_date(feed.entries, target_dates) | |
| if entry.get("link") | |
| ] | |
| return { | |
| "url": url, | |
| "title": feed_title, | |
| "entries": filtered_entries, | |
| "success": True | |
| } | |
| except Exception as e: | |
| return { | |
| "url": url, | |
| "title": url, | |
| "entries": [], | |
| "success": False, | |
| "error": str(e) | |
| } | |
| def fetch_default_feeds(target_dates: List[str]) -> None: | |
| """并发获取 DEFAULT_FEEDS 中的所有 RSS 订阅""" | |
| if not DEFAULT_FEEDS: | |
| print("错误: DEFAULT_FEEDS 列表为空,请先添加 RSS 订阅链接。") | |
| exit(1) | |
| all_entries = [] | |
| failures = [] | |
| with ThreadPoolExecutor(max_workers=min(10, len(DEFAULT_FEEDS))) as executor: | |
| futures = { | |
| executor.submit(fetch_and_filter_feed, url, target_dates): url | |
| for url in DEFAULT_FEEDS | |
| } | |
| for future in as_completed(futures): | |
| result = future.result() | |
| if result["success"]: | |
| all_entries.extend(result["entries"]) | |
| else: | |
| failures.append(result) | |
| new_entries = persist_entries(all_entries) | |
| written_files = write_new_entries_to_markdown(new_entries) | |
| print_summary(len(all_entries), new_entries, written_files, failures, target_dates) | |
| def main() -> None: | |
| """主函数""" | |
| init_database() | |
| target_dates = get_recent_dates() | |
| fetch_default_feeds(target_dates) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment