Skip to content

Instantly share code, notes, and snippets.

@duyixian1234
Created March 20, 2026 07:34
Show Gist options
  • Select an option

  • Save duyixian1234/a565f309043a8b888ed6b47faa115066 to your computer and use it in GitHub Desktop.

Select an option

Save duyixian1234/a565f309043a8b888ed6b47faa115066 to your computer and use it in GitHub Desktop.
rss_fetch
# /// script
# dependencies = [
# "feedparser",
# ]
# ///
"""
RSS 订阅抓取脚本
脚本无需接收命令行参数,会并发抓取 DEFAULT_FEEDS 中最近一周的帖子,
并将所有帖子持久化到工作区根目录下的 SQLite 数据库 `rss_feeds.db`。
首次发现的新帖子会额外按发布日期保存到 `feeds/feeds-YYYY-MM-DD.md`。
帖子 URL 作为数据库唯一键,重复运行不会重复入库,也不会重复写入 Markdown。
"""
import sqlite3
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List
import feedparser
# 默认 RSS 订阅列表
DEFAULT_FEEDS = [
"https://rsshub.rssforever.com/readhub/daily","https://www.qbitai.com/feed","https://www.oschina.net/news/rss"
]
WORKSPACE_ROOT = Path(__file__).resolve().parent.parent
DB_PATH = WORKSPACE_ROOT / "rss_feeds.db"
FEEDS_DIR = WORKSPACE_ROOT / "feeds"
RECENT_DAYS = 7
def fetch_rss_feed(url: str) -> feedparser.FeedParserDict:
"""获取并解析 RSS 订阅"""
feed = feedparser.parse(url)
if feed.bozo != 0:
raise Exception(f"解析 RSS 失败: {feed.bozo_exception}")
return feed
def get_entry_date(entry: feedparser.FeedParserDict) -> datetime:
"""从 RSS 条目获取发布日期"""
if hasattr(entry, "published_parsed") and entry.published_parsed:
return datetime(*entry.published_parsed[:6])
elif hasattr(entry, "updated_parsed") and entry.updated_parsed:
return datetime(*entry.updated_parsed[:6])
raise ValueError("无法找到条目日期")
def filter_entries_by_date(
entries: List[feedparser.FeedParserDict],
target_dates: List[str]
) -> List[Dict[str, Any]]:
"""根据指定日期筛选条目"""
target_date_objs = [
datetime.strptime(date_str, "%Y-%m-%d").date()
for date_str in target_dates
]
filtered = []
for entry in entries:
try:
entry_date = get_entry_date(entry).date()
if entry_date in target_date_objs:
filtered.append({
"title": entry.get("title", "无标题"),
"link": entry.get("link", ""),
"published": get_entry_date(entry).strftime("%Y-%m-%d %H:%M:%S"),
"published_date": entry_date.strftime("%Y-%m-%d"),
})
except (ValueError, AttributeError):
continue
return filtered
def get_recent_dates(days: int = RECENT_DAYS) -> List[str]:
"""获取最近 N 天(含今天)的日期字符串列表。"""
today = date.today()
return [
(today - timedelta(days=offset)).strftime("%Y-%m-%d")
for offset in range(days)
]
def init_database() -> None:
"""初始化 SQLite 数据库。"""
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(DB_PATH) as connection:
connection.execute(
"""
CREATE TABLE IF NOT EXISTS posts (
url TEXT PRIMARY KEY,
title TEXT NOT NULL,
published TEXT NOT NULL,
published_date TEXT NOT NULL,
feed_title TEXT NOT NULL,
feed_url TEXT NOT NULL,
first_seen_at TEXT NOT NULL
)
"""
)
connection.commit()
def persist_entries(entries: Iterable[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""将帖子持久化到数据库,返回本次新增的帖子。"""
new_entries: List[Dict[str, Any]] = []
first_seen_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S%z")
with sqlite3.connect(DB_PATH) as connection:
for entry in entries:
cursor = connection.execute(
"""
INSERT OR IGNORE INTO posts (
url,
title,
published,
published_date,
feed_title,
feed_url,
first_seen_at
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
entry["link"],
entry["title"],
entry["published"],
entry["published_date"],
entry["feed_title"],
entry["feed_url"],
first_seen_at,
),
)
if cursor.rowcount == 1:
new_entries.append(entry)
connection.commit()
return new_entries
def write_new_entries_to_markdown(entries: Iterable[Dict[str, Any]]) -> List[Path]:
"""将新增帖子按日期写入 Markdown 文件。"""
FEEDS_DIR.mkdir(parents=True, exist_ok=True)
grouped_entries: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for entry in entries:
grouped_entries[entry["published_date"]].append(entry)
written_files: List[Path] = []
for published_date, dated_entries in sorted(grouped_entries.items()):
file_path = FEEDS_DIR / f"feeds-{published_date}.md"
file_exists = file_path.exists()
with file_path.open("a", encoding="utf-8") as file:
if not file_exists:
file.write(f"# Feeds for {published_date}\n\n")
for entry in dated_entries:
file.write(f"- [{entry['title']}]({entry['link']})\n")
file.write(f" - 来源: {entry['feed_title']}\n")
file.write(f" - Feed: {entry['feed_url']}\n")
file.write(f" - 发布时间: {entry['published']}\n\n")
written_files.append(file_path)
return written_files
def print_summary(
total_entries: int,
new_entries: List[Dict[str, Any]],
written_files: List[Path],
failures: List[Dict[str, Any]],
target_dates: List[str],
) -> None:
"""输出执行摘要。"""
print(f"抓取日期范围: {target_dates[-1]} 至 {target_dates[0]}")
print(f"扫描到的帖子总数: {total_entries}")
print(f"新增入库帖子数: {len(new_entries)}")
if written_files:
print("已更新 Markdown 文件:")
for file_path in written_files:
print(f"- {file_path.relative_to(WORKSPACE_ROOT)}")
else:
print("没有新的帖子需要写入 Markdown。")
if failures:
print("抓取失败的订阅:")
for failure in failures:
print(f"- {failure['title']}: {failure.get('error', '未知错误')}")
def fetch_and_filter_feed(url: str, target_dates: List[str]) -> Dict[str, Any]:
"""获取单个 RSS 订阅并筛选条目"""
try:
feed = fetch_rss_feed(url)
feed_title = feed.feed.get("title", url)
filtered_entries = [
{
**entry,
"feed_title": feed_title,
"feed_url": url,
}
for entry in filter_entries_by_date(feed.entries, target_dates)
if entry.get("link")
]
return {
"url": url,
"title": feed_title,
"entries": filtered_entries,
"success": True
}
except Exception as e:
return {
"url": url,
"title": url,
"entries": [],
"success": False,
"error": str(e)
}
def fetch_default_feeds(target_dates: List[str]) -> None:
"""并发获取 DEFAULT_FEEDS 中的所有 RSS 订阅"""
if not DEFAULT_FEEDS:
print("错误: DEFAULT_FEEDS 列表为空,请先添加 RSS 订阅链接。")
exit(1)
all_entries = []
failures = []
with ThreadPoolExecutor(max_workers=min(10, len(DEFAULT_FEEDS))) as executor:
futures = {
executor.submit(fetch_and_filter_feed, url, target_dates): url
for url in DEFAULT_FEEDS
}
for future in as_completed(futures):
result = future.result()
if result["success"]:
all_entries.extend(result["entries"])
else:
failures.append(result)
new_entries = persist_entries(all_entries)
written_files = write_new_entries_to_markdown(new_entries)
print_summary(len(all_entries), new_entries, written_files, failures, target_dates)
def main() -> None:
"""主函数"""
init_database()
target_dates = get_recent_dates()
fetch_default_feeds(target_dates)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment