Skip to content

Instantly share code, notes, and snippets.

@ParinLL
Created April 28, 2026 06:18
Show Gist options
  • Select an option

  • Save ParinLL/e5f7869a9abf80c6e79313c78ddd5b93 to your computer and use it in GitHub Desktop.

Select an option

Save ParinLL/e5f7869a9abf80c6e79313c78ddd5b93 to your computer and use it in GitHub Desktop.
Obsidian scripts
#!/usr/bin/env python3
"""
將 Facebook public post 存成 Obsidian Clippings 筆記
用法:
python3 scripts/facebook-to-clippings.py <facebook_post_url>
python3 scripts/facebook-to-clippings.py <facebook_post_url> --cookies ~/cookies.txt
python3 scripts/facebook-to-clippings.py <facebook_post_url> --dry-run
python3 scripts/facebook-to-clippings.py <facebook_post_url> --manual
取得 cookies.txt:
1. 安裝 Chrome 擴充套件「Get cookies.txt LOCALLY」
2. 登入 Facebook 後,在 facebook.com 頁面點擊擴充套件 → Export
3. 儲存為 www.facebook.com_cookies.txt,放在 scripts/ 目錄下
或用 --cookies 指定路徑(支援全站 cookies.txt,會自動過濾只留 Facebook cookies)
解析策略:
- 貼文內文:優先用 og:description 或 <title> 作為 anchor,
再從 HTML JSON 找包含 anchor 開頭的完整 message.text
- 作者:og:title / <title> 取第一個「 - 」前的部分(限 60 字),
避免抓到登入者(JSON actor.name)
- 發文時間:用 text 內容定位 <script> 標籤,往後找 100,000 字內的
creation_time / publish_time(URL 和時間戳在 HTML 裡可能相距超過 100 萬字)
- 圖片:從 <link rel="preload" data-preloader="CometSinglePost..."> 抓取,
過濾 t39.30808-1(profile picture)
輸出:
Clippings/<貼文第一行>.md
"""
import re
import sys
import json
import argparse
import pathlib
import http.cookiejar
from datetime import datetime, timezone
VAULT = pathlib.Path(__file__).resolve().parent.parent
CLIPPINGS_DIR = VAULT / "Clippings"
DEFAULT_COOKIES = pathlib.Path(__file__).resolve().parent / "www.facebook.com_cookies.txt"
# ── helpers ──────────────────────────────────────────────
def sanitize_filename(name: str) -> str:
"""移除檔名不合法字元"""
return re.sub(r'[\\/:*?"<>|]', '-', name).strip()
def format_images(images: list) -> str:
"""將圖片 URL 列表轉成 markdown"""
if not images:
return ""
lines = []
for url in images:
if url:
lines.append(f"![]({url})")
return "\n".join(lines)
def build_markdown(post: dict) -> tuple[str, str]:
"""
從 post dict 建立 frontmatter + 內容,回傳 (filename, markdown)
"""
# ── 基本欄位 ──
post_url = post.get("post_url") or post.get("original_request_url", "")
text = post.get("text") or post.get("post_text") or ""
author = post.get("username") or post.get("user_id") or "unknown"
author_url = post.get("author_url") or ""
# 時間
time_obj = post.get("time")
if isinstance(time_obj, datetime):
published = time_obj.strftime("%Y-%m-%d")
elif isinstance(time_obj, str):
published = time_obj[:10]
else:
published = datetime.now().strftime("%Y-%m-%d")
created = datetime.now().strftime("%Y-%m-%d")
# description(前 150 字,單行)
description = " ".join(text.split())[:150]
# 圖片
images = post.get("images") or post.get("images_lowquality") or []
image_md = format_images(images)
# shared post
shared_text = post.get("shared_text") or ""
shared_from = post.get("shared_from") or ""
# ── 組合內容 ──
body_parts = []
if text:
body_parts.append(text)
if image_md:
body_parts.append(image_md)
if shared_text:
quote_lines = "\n".join(f"> {line}" for line in shared_text.splitlines())
if shared_from:
body_parts.append(f"**分享自 {shared_from}**\n\n{quote_lines}")
else:
body_parts.append(quote_lines)
body = "\n\n".join(body_parts) if body_parts else "(無法取得貼文內容)"
# ── frontmatter(格式對齊現有 Clippings)──
import yaml
# 自訂 representer:字串一律用雙引號,與現有 Clippings 格式一致
class QuotedStr(str):
pass
def quoted_representer(dumper, data):
return dumper.represent_scalar("tag:yaml.org,2002:str", data, style='"')
yaml.add_representer(QuotedStr, quoted_representer)
title = f"{author} on Facebook ({published})"
front = {
"title": QuotedStr(title),
"source": QuotedStr(post_url),
"author": [QuotedStr(f"[{author}]({author_url})" if author_url else author)],
"published": published,
"created": created,
"description": QuotedStr(description),
"tags": [QuotedStr("clippings"), QuotedStr("facebook")],
}
yaml_str = yaml.dump(
front,
allow_unicode=True,
default_flow_style=False,
sort_keys=False,
indent=2,
)
frontmatter = f"---\n{yaml_str}---\n"
markdown = frontmatter + "\n" + body + "\n"
# 用貼文第一行(非空)當檔名,fallback 到 author - published
first_line = next((l.strip() for l in text.splitlines() if l.strip()), "")
if first_line:
filename = sanitize_filename(first_line[:80] + ".md")
else:
filename = sanitize_filename(f"{author} - {published}.md")
return filename, markdown
def filter_facebook_cookies(src: pathlib.Path) -> pathlib.Path:
"""
過濾 cookies.txt,只保留 facebook.com 相關的行,
寫到暫存檔後回傳路徑。
"""
import tempfile
lines = src.read_text(encoding="utf-8", errors="ignore").splitlines()
fb_lines = []
for line in lines:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
fb_lines.append(line)
continue
parts = stripped.split("\t")
if len(parts) >= 6 and "facebook" in parts[0].lower():
fb_lines.append(line)
tmp = tempfile.NamedTemporaryFile(
mode="w", suffix=".txt", delete=False, encoding="utf-8"
)
tmp.write("\n".join(fb_lines) + "\n")
tmp.close()
return pathlib.Path(tmp.name)
def load_cookies(cookies_path: pathlib.Path) -> http.cookiejar.MozillaCookieJar:
"""載入 cookies,若包含非 Facebook 行則先過濾"""
jar = http.cookiejar.MozillaCookieJar()
raw = cookies_path.read_text(encoding="utf-8", errors="ignore")
has_non_fb = any(
line.strip() and not line.strip().startswith("#") and "facebook" not in line.split("\t")[0].lower()
for line in raw.splitlines()
if "\t" in line
)
if has_non_fb:
filtered_path = filter_facebook_cookies(cookies_path)
else:
filtered_path = cookies_path
jar.load(str(filtered_path), ignore_discard=True, ignore_expires=True)
return jar
def scrape_with_requests(url: str, cookies_path: pathlib.Path | None) -> dict | None:
"""
用 requests + www.facebook.com 抓取貼文內容。
Facebook 把內容嵌在 HTML 的 JSON 資料裡,用 regex 解析。
"""
try:
import requests
from bs4 import BeautifulSoup
except ImportError:
print("❌ 請先安裝: pip install requests beautifulsoup4")
sys.exit(1)
session = requests.Session()
session.headers.update({
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Accept-Language": "zh-TW,zh;q=0.9,en;q=0.8",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Sec-Fetch-Site": "none",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-User": "?1",
"Sec-Fetch-Dest": "document",
})
if cookies_path and cookies_path.exists():
session.cookies = load_cookies(cookies_path)
print(f"🍪 使用 cookies: {cookies_path}")
else:
print("⚠️ 未提供 cookies,可能只能抓到部分資訊")
print(f"🌐 抓取: {url}")
try:
resp = session.get(url, timeout=20, allow_redirects=True)
except Exception as e:
print(f"❌ 請求失敗: {e}")
return None
if resp.status_code != 200:
print(f"❌ HTTP {resp.status_code}")
return None
html = resp.text
soup = BeautifulSoup(html, "html.parser")
text = ""
author = "unknown"
published = datetime.now().strftime("%Y-%m-%d")
images = []
# Step 1:從 og:description 取得貼文開頭(最可靠,一定是目標貼文)
og_desc_match = re.search(
r'<meta[^>]+property="og:description"[^>]+content="([^"]+)"', html
)
if not og_desc_match:
og_desc_match = re.search(
r'<meta[^>]+content="([^"]+)"[^>]+property="og:description"', html
)
og_desc = ""
if og_desc_match:
og_desc = og_desc_match.group(1).replace("&#039;", "'").replace("&amp;", "&").replace("&quot;", '"').strip()
# og:description 不存在時,從 <title> 取貼文標題作為 anchor
title_anchor = ""
if not og_desc:
page_title = re.search(r'<title>([^<]+)</title>', html)
if page_title:
raw_title = page_title.group(1).strip()
parts = re.split(r'\s*-\s*', raw_title, maxsplit=1)
if len(parts) > 1:
title_anchor = parts[1].rstrip("...").strip()
anchor = og_desc or title_anchor
# Step 2:從 JSON 找 message.text,只接受包含 anchor 開頭的版本
precise_patterns = [
r'"message"\s*:\s*\{"text"\s*:\s*"((?:[^"\\]|\\.)*)"\}',
r'"story_message"\s*:\s*\{"text"\s*:\s*"((?:[^"\\]|\\.)*)"\}',
r'"body"\s*:\s*\{"text"\s*:\s*"((?:[^"\\]|\\.)*)"\}',
]
if anchor:
anchor_start = anchor[:40].lower().strip()
for pattern in precise_patterns:
for m in re.finditer(pattern, html):
raw = m.group(1)
try:
decoded = json.loads(f'"{raw}"')
except Exception:
decoded = raw.replace("\\n", "\n").replace('\\"', '"')
if len(decoded) > 20 and anchor_start in decoded.lower()[:120]:
text = decoded
break
if text:
break
else:
candidates = []
for pattern in precise_patterns:
for m in re.finditer(pattern, html):
raw = m.group(1)
try:
decoded = json.loads(f'"{raw}"')
except Exception:
decoded = raw.replace("\\n", "\n").replace('\\"', '"')
if len(decoded) > 50:
candidates.append(decoded)
if candidates:
text = max(candidates, key=len)
# Step 3:fallback — 直接用 og:description
if not text:
text = og_desc
# 找作者名稱(og:title / <title> 取第一個「 - 」前的部分)
og_title = re.search(r'<meta[^>]+property="og:title"[^>]+content="([^"]+)"', html)
if not og_title:
og_title = re.search(r'<meta[^>]+content="([^"]+)"[^>]+property="og:title"', html)
if og_title:
raw_title = og_title.group(1).strip()
first_part = re.split(r'\s*[-|]\s*', raw_title, maxsplit=1)[0].strip()
if first_part and len(first_part) <= 60 and first_part.lower() not in ("error", "facebook", ""):
author = first_part
if author == "unknown":
page_title = re.search(r'<title>([^<]+)</title>', html)
if page_title:
raw_title = page_title.group(1).strip()
first_part = re.split(r'\s*[-|]\s*', raw_title, maxsplit=1)[0].strip()
if first_part and len(first_part) <= 60 and first_part.lower() not in ("error", "facebook", ""):
author = first_part
if author == "unknown":
url_match = re.search(r'facebook\.com/([a-zA-Z0-9._]+)/posts/', html)
if url_match:
author = url_match.group(1).replace(".", " ").title()
if author == "unknown":
for pattern in [
r'"actor"\s*:\s*\{[^}]*"name"\s*:\s*"([^"]{2,60})"',
r'"owner"\s*:\s*\{[^}]*"name"\s*:\s*"([^"]{2,60})"',
]:
m = re.search(pattern, html, re.DOTALL)
if m:
candidate = m.group(1)
try:
candidate = json.loads(f'"{candidate}"')
except Exception:
pass
if not re.match(r'^[a-z][a-zA-Z]+(?:Bundle|Worker|Module|Script)$', candidate):
author = candidate
break
# 找發文時間(用 text 內容定位,往後找 100,000 字內的時間戳)
time_patterns = [
r'"publish_time"\s*:\s*(\d{10})',
r'\\"publish_time\\":\s*(\d{10})',
r'"creation_time"\s*:\s*(\d{10})',
r'\\"creation_time\\":\s*(\d{10})',
]
def find_time_near(anchor_text: str, search_range: int = 5000) -> str | None:
if not anchor_text:
return None
needle = anchor_text[:20]
positions = []
start = 0
while True:
pos = html.find(needle, start)
if pos == -1:
break
positions.append(pos)
start = pos + 1
try:
needle_escaped = json.dumps(needle)[1:-1]
start = 0
while True:
pos = html.find(needle_escaped, start)
if pos == -1:
break
positions.append(pos)
start = pos + 1
except Exception:
pass
if not positions:
return None
script_positions = [p for p in positions if html[max(0,p-200):p].rfind('<script') > html[max(0,p-200):p].rfind('</script')]
search_positions = script_positions if script_positions else positions
for pos in sorted(search_positions):
chunk_before = html[max(0, pos - search_range): pos]
for pattern in time_patterns:
matches = list(re.finditer(pattern, chunk_before))
if matches:
ts = int(matches[-1].group(1))
if 1262304000 < ts < 1893456000:
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d")
chunk_after = html[pos: pos + 100000]
for pattern in time_patterns:
m = re.search(pattern, chunk_after)
if m:
ts = int(m.group(1))
if 1262304000 < ts < 1893456000:
return datetime.fromtimestamp(ts).strftime("%Y-%m-%d")
return None
if text:
result = find_time_near(text)
if result:
published = result
if published == datetime.now().strftime("%Y-%m-%d"):
for pattern in time_patterns:
m = re.search(pattern, html)
if m:
ts = int(m.group(1))
if 1262304000 < ts < 1893456000:
published = datetime.fromtimestamp(ts).strftime("%Y-%m-%d")
break
# 找圖片(CometSinglePost preload link,過濾 profile picture)
seen_urls = set()
for m in re.finditer(
r'<link\b(?=[^>]*data-preloader="[^"]*CometSinglePost[^"]*")[^>]*\bhref="([^"]+)"[^>]*/?>',
html,
):
raw_url = m.group(1).replace("&amp;", "&")
if "fbcdn.net" in raw_url and "t39.30808-1" not in raw_url and raw_url not in seen_urls:
seen_urls.add(raw_url)
images.append(raw_url)
if not images:
for m in re.finditer(
r'"uri"\s*:\s*"(https:(?:\\/|/)[^"]*\.fbcdn\.net[^"]+)"',
html,
):
raw_url = m.group(1).replace("\\/", "/")
if any(x in raw_url for x in ("emoji", "icon", "rsrc.php", "safe_image", "static", "t1.6435", "t39.30808-1")):
continue
if raw_url not in seen_urls:
seen_urls.add(raw_url)
images.append(raw_url)
if not text or len(text) < 10:
print("⚠️ 未能解析到貼文內容(Facebook 可能需要登入或已更改頁面結構)")
debug_path = pathlib.Path(__file__).resolve().parent / "debug-fb-last.html"
debug_path.write_text(html, encoding="utf-8")
print(f" 已儲存 HTML 到 {debug_path} 供分析")
return None
slug_match = re.search(r'facebook\.com/([^/?]+)/posts/', url)
author_url = f"https://www.facebook.com/{slug_match.group(1)}" if slug_match else ""
return {
"post_url": url,
"original_request_url": url,
"text": text,
"username": author,
"author_url": author_url,
"time": published,
"images": images[:10],
}
def manual_input_fallback(url: str) -> dict:
"""讓使用者手動貼上內容"""
print()
print("=" * 60)
print("📋 手動輸入模式")
print(" 請在 Facebook 開啟貼文,複製貼文內容後貼到這裡")
print(" 輸入完畢後按 Enter 再輸入 END 結束")
print("=" * 60)
author = input("作者名稱: ").strip() or "unknown"
date_str = input("發文日期(YYYY-MM-DD,留空用今天): ").strip()
if not date_str:
date_str = datetime.now().strftime("%Y-%m-%d")
print("貼文內容(輸入 END 結束):")
lines = []
while True:
line = input()
if line.strip() == "END":
break
lines.append(line)
text = "\n".join(lines)
return {
"post_url": url,
"original_request_url": url,
"text": text,
"username": author,
"time": date_str,
"images": [],
}
# ── main ─────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="將 Facebook public post 存成 Obsidian Clippings 筆記"
)
parser.add_argument("url", help="Facebook 貼文 URL")
parser.add_argument(
"--cookies",
default=None,
help="cookies.txt 路徑(Netscape 格式,預設找 scripts/www.facebook.com_cookies.txt)",
)
parser.add_argument(
"--manual",
action="store_true",
help="跳過自動抓取,直接手動輸入內容",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="只印出結果,不寫入檔案",
)
args = parser.parse_args()
if args.cookies:
cookies_path = pathlib.Path(args.cookies).expanduser()
elif DEFAULT_COOKIES.exists():
cookies_path = DEFAULT_COOKIES
else:
cookies_path = None
print(f"🔗 URL: {args.url}")
if args.manual:
post = manual_input_fallback(args.url)
else:
post = scrape_with_requests(args.url, cookies_path)
if not post or not (post.get("text") or post.get("post_text")):
print()
print("⚠️ 自動抓取未取得內容,切換到手動輸入模式...")
post = manual_input_fallback(args.url)
post["post_url"] = args.url
post["original_request_url"] = args.url
filename, markdown = build_markdown(post)
output_path = CLIPPINGS_DIR / filename
print()
print("─" * 60)
print(markdown)
print("─" * 60)
if args.dry_run:
print(f"[DRY RUN] 不寫入檔案(預計路徑: {output_path})")
return
if output_path.exists():
stem = output_path.stem
suffix = output_path.suffix
counter = 1
while output_path.exists():
output_path = CLIPPINGS_DIR / f"{stem}-{counter}{suffix}"
counter += 1
output_path.write_text(markdown, encoding="utf-8")
print(f"✅ 已儲存: {output_path.relative_to(VAULT)}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
下載 Obsidian 筆記中的外部圖片 → 上傳到 R2 → 改寫 URL
用法:
python3 scripts/migrate-external-images.py --dry-run # 只列出不執行
python3 scripts/migrate-external-images.py --folder Notion/ # 只處理特定資料夾
python3 scripts/migrate-external-images.py --file path/to.md # 指定單一檔案
python3 scripts/migrate-external-images.py # 全量執行
設定(修改下方常數):
OWN_DOMAIN 你的圖床 domain(已在此 domain 的圖片會跳過)
API PicList server URL(含 picbed、configName、key 參數)
架構:
Obsidian → PicList Docker (NAS/server)
→ Cloudflare R2 (or any S3-compatible storage)
→ https://your-cdn-domain/
處理邏輯:
- 掃描 ![alt](外部圖片 URL) 格式(支援有/無副檔名)
- 跳過已指向自有圖床(OWN_DOMAIN)的圖片
- 透過 PicList API 讓 server 下載外部圖片並上傳到 R2
- 替換成功後改寫 md 檔案,全部完成才記錄到 done.log
日誌:
scripts/ext-migrate-output.log 執行紀錄
scripts/ext-migrate-done.log 已完成的筆記(跳過重複處理)
scripts/ext-migrate-failed.log 失敗的圖片 URL
"""
import re, json, sys, time, argparse, urllib.request, pathlib, os
from urllib.parse import urlsplit, urlunsplit, quote, unquote
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
VAULT = pathlib.Path(__file__).resolve().parent.parent
# PicList API endpoint,格式:
# http://<host>:<port>/upload?picbed=<uploader>&configName=<name>&key=<secret>
API = "http://YOUR_PICLIST_HOST:36677/upload?picbed=aws-s3-plist&configName=R2&key=YOUR_SECRET_KEY"
# 已在此 domain 的圖片會跳過(填你的圖床 domain)
OWN_DOMAIN = "your-cdn-domain.example.com"
DONE_LOG = VAULT / "scripts" / "ext-migrate-done.log"
FAILED_LOG = VAULT / "scripts" / "ext-migrate-failed.log"
OUTPUT_LOG = VAULT / "scripts" / "ext-migrate-output.log"
IMG_EXT = r"png|jpe?g|gif|webp|svg|bmp|avif|tiff|ico"
# 匹配以下格式(不匹配自己圖床的 URL):
# 1. ![alt](url.ext)
# 2. ![alt](url.ext?query)
# 3. ![alt](url.ext "title")
# 4. ![alt](url) 無副檔名(Yahoo/CDN proxy 等)
EXTERNAL_IMG_RE = re.compile(
r'(!\[[^\]]*\])\((https?://(?!(?:[^/]*\.)?' + re.escape(OWN_DOMAIN) + r')[^)\s"]+)'
r'(?:'
r'\.(?:' + IMG_EXT + r')(?:\?[^)\s"]*)?' # 有副檔名
r'|'
r'(?:/[^)\s"]*)?' # 無副檔名(path 結尾)
r')\s*(?:"[^"]*")?\)',
re.IGNORECASE,
)
SKIP_DIRS = {".trash", ".obsidian", "Templates", "scripts", ".git"}
_log_fh = None
def log(msg=""):
print(msg, flush=True)
if _log_fh:
_log_fh.write(msg + "\n")
_log_fh.flush()
def encode_url(url: str) -> str:
"""URL encode 非 ASCII 字元(中文檔名等)"""
parts = urlsplit(url)
encoded_path = quote(unquote(parts.path), safe="/:@!$&'()*+,;=-._~")
return urlunsplit((parts.scheme, parts.netloc, encoded_path, parts.query, parts.fragment))
def upload_url_to_r2(url: str):
"""直接把外部 URL 丟給 PicList,讓 server 端下載再傳到 R2"""
encoded_url = encode_url(url)
payload = json.dumps({"list": [encoded_url]}).encode()
req = urllib.request.Request(
API, data=payload, headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
if data.get("success") and data.get("result"):
return data["result"][0]
log(f" API 失敗: {data}")
except Exception as e:
log(f" API 錯誤: {e}")
return None
def load_lines(path: pathlib.Path) -> set:
if path.exists():
return set(path.read_text(encoding="utf-8").splitlines())
return set()
def main():
global _log_fh
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--folder", default="")
parser.add_argument("--file", default="", help="指定單一 md 檔案(相對於 vault 的路徑)")
args = parser.parse_args()
_log_fh = open(OUTPUT_LOG, "w", encoding="utf-8")
done_set = load_lines(DONE_LOG)
if args.file:
md_file = pathlib.Path(args.file)
if not md_file.is_absolute():
md_file = VAULT / args.file
if not md_file.exists():
log(f"找不到檔案: {md_file}")
_log_fh.close()
return
candidates = [md_file]
else:
search_root = VAULT / args.folder if args.folder else VAULT
candidates = sorted(search_root.rglob("*.md"))
targets = []
for md_file in candidates:
rel = str(md_file.relative_to(VAULT))
if any(part in SKIP_DIRS for part in pathlib.Path(rel).parts):
continue
if rel in done_set:
continue
content = md_file.read_text(encoding="utf-8")
matches = EXTERNAL_IMG_RE.findall(content)
if matches:
urls = list(set(m[1] for m in matches))
targets.append((md_file, rel, urls))
total_imgs = sum(len(urls) for _, _, urls in targets)
log("=== 外部圖片 → R2 批次遷移 ===")
log(f"日誌: {OUTPUT_LOG}")
log(f"共 {len(targets)} 篇筆記、{total_imgs} 個外部圖片")
if args.dry_run:
log("[DRY RUN] 只列出不實際執行")
log()
stats = {"ok": 0, "skip": 0, "fail": 0}
for idx, (md_file, rel, urls) in enumerate(targets, 1):
log(f"[{idx}/{len(targets)}] {rel} ({len(urls)} 張)")
content = md_file.read_text(encoding="utf-8")
changed = False
all_ok = True
for url in urls:
if url not in content:
log(f" ⏭ 已無此 URL,跳過")
stats["skip"] += 1
continue
short_url = url[:80] + ("..." if len(url) > 80 else "")
if args.dry_run:
log(f" → {short_url}")
continue
remote_url = upload_url_to_r2(url)
if not remote_url:
log(f" ✗ {short_url} (上傳失敗)")
with open(FAILED_LOG, "a", encoding="utf-8") as fh:
fh.write(f"{rel} | {url} | UPLOAD_FAILED\n")
all_ok = False
stats["fail"] += 1
time.sleep(2)
continue
new_content = content.replace(url, remote_url)
if new_content != content:
content = new_content
changed = True
stats["ok"] += 1
log(f" ✓ {short_url}")
else:
log(f" ⚠ 替換未生效: {short_url}")
stats["fail"] += 1
time.sleep(0.5)
if not args.dry_run and changed:
md_file.write_text(content, encoding="utf-8")
verify = md_file.read_text(encoding="utf-8")
remaining = len(EXTERNAL_IMG_RE.findall(verify))
if remaining == 0:
log(f" 📝 已寫入,外部圖片全部替換 ✓")
with open(DONE_LOG, "a", encoding="utf-8") as fh:
fh.write(rel + "\n")
else:
log(f" 📝 已寫入,仍有 {remaining} 個外部圖片")
log()
log("=== 完成 ===")
log(f"成功: {stats['ok']}")
log(f"跳過: {stats['skip']}")
log(f"失敗: {stats['fail']}")
if stats["fail"] > 0:
log(f"失敗紀錄: {FAILED_LOG}")
_log_fh.close()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
批次上傳 Obsidian 本地附件到 Cloudflare R2 並改寫 .md 引用
用法:
python3 scripts/migrate-to-r2.py --dry-run # 只列出不執行
python3 scripts/migrate-to-r2.py --folder "Notion/" # 只處理特定資料夾
python3 scripts/migrate-to-r2.py --file path/to/note.md # 指定單一檔案
python3 scripts/migrate-to-r2.py # 全量執行
設定(修改下方常數):
ATTACH_DIR 本地附件目錄
API PicList server URL(含 picbed、configName、key 參數)
日誌自動寫到 scripts/migrate-output.log
"""
import re, json, time, argparse, urllib.request, pathlib, hashlib
VAULT = pathlib.Path(__file__).resolve().parent.parent
ATTACH_DIR = VAULT / "attachments" # ← 修改為你的附件目錄
# PicList API endpoint,格式:
# http://<host>:<port>/upload?picbed=<uploader>&configName=<name>&key=<secret>
API = "http://YOUR_PICLIST_HOST:36677/upload?picbed=aws-s3-plist&configName=R2&key=YOUR_SECRET_KEY"
DONE_LOG = VAULT / "scripts" / "migrate-done.log"
FAILED_LOG = VAULT / "scripts" / "migrate-failed.log"
OUTPUT_LOG = VAULT / "scripts" / "migrate-output.log"
EXTENSIONS = (
"png|jpg|jpeg|gif|webp|svg|bmp|tiff|ico|avif|"
"mp4|webm|ogv|mov|mkv|mp3|wav|ogg|m4a|flac|3gp|aac|pdf"
)
# 匹配 ![[file.ext]] 或 ![[file.ext|alias]],捕獲檔名(不含 |alias)
WIKILINK_RE = re.compile(
r'!\[\[([^|\]]+\.(?:' + EXTENSIONS + r'))(?:\|[^\]]*)?\]\]',
re.IGNORECASE,
)
SKIP_DIRS = {".trash", ".obsidian", "Templates", "scripts", ".git"}
_log_fh = None
# ── helpers ──────────────────────────────────────────────
def log(msg=""):
print(msg, flush=True)
if _log_fh:
_log_fh.write(msg + "\n")
_log_fh.flush()
def find_local_file(filename: str):
"""在 attachments 目錄或整個 vault 找檔案"""
candidate = ATTACH_DIR / filename
if candidate.exists():
return candidate
for p in VAULT.rglob(filename):
if not any(part in SKIP_DIRS for part in p.parts):
return p
return None
def file_md5(path: pathlib.Path) -> str:
return hashlib.md5(path.read_bytes()).hexdigest()
def upload_file(local_path: pathlib.Path):
"""呼叫 PicList API,回傳遠端 URL 或 None"""
payload = json.dumps({"list": [str(local_path)]}).encode()
req = urllib.request.Request(
API, data=payload, headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read())
if data.get("success") and data.get("result"):
return data["result"][0]
except Exception as e:
log(f" API 錯誤: {e}")
return None
def load_lines(path: pathlib.Path) -> set:
if path.exists():
return set(path.read_text(encoding="utf-8").splitlines())
return set()
# ── main ─────────────────────────────────────────────────
def main():
global _log_fh
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true", help="只列出不實際執行")
parser.add_argument("--folder", default="", help="只處理特定子資料夾")
parser.add_argument("--file", default="", help="指定單一 md 檔案(相對於 vault 的路徑)")
args = parser.parse_args()
_log_fh = open(OUTPUT_LOG, "w", encoding="utf-8")
done_set = load_lines(DONE_LOG)
if args.file:
md_file = pathlib.Path(args.file)
if not md_file.is_absolute():
md_file = VAULT / args.file
if not md_file.exists():
log(f"找不到檔案: {md_file}")
_log_fh.close()
return
candidates = [md_file]
else:
search_root = VAULT / args.folder if args.folder else VAULT
candidates = sorted(search_root.rglob("*.md"))
targets = []
for md_file in candidates:
rel = str(md_file.relative_to(VAULT))
if any(part in SKIP_DIRS for part in pathlib.Path(rel).parts):
continue
if rel in done_set:
continue
content = md_file.read_text(encoding="utf-8")
filenames = list(set(WIKILINK_RE.findall(content)))
if filenames:
targets.append((md_file, rel, filenames))
log("=== Obsidian → R2 批次遷移 ===")
log(f"日誌: {OUTPUT_LOG}")
total_imgs = sum(len(fns) for _, _, fns in targets)
log(f"共 {len(targets)} 篇筆記、{total_imgs} 個附件引用需要處理")
if args.dry_run:
log("[DRY RUN] 只列出不實際執行")
log()
stats = {"ok": 0, "skip": 0, "fail": 0}
for idx, (md_file, rel, filenames) in enumerate(targets, 1):
log(f"[{idx}/{len(targets)}] {rel} ({len(filenames)} 個附件)")
content = md_file.read_text(encoding="utf-8")
changed = False
all_ok = True
for filename in filenames:
pattern = re.compile(
r'!\[\[' + re.escape(filename) + r'(?:\|[^\]]*)?\]\]'
)
if not pattern.search(content):
log(f" ⏭ {filename} (已無 wikilink,跳過)")
stats["skip"] += 1
continue
local_path = find_local_file(filename)
if local_path is None:
log(f" ⚠ 找不到: {filename}")
with open(FAILED_LOG, "a") as fh:
fh.write(f"{rel} | {filename} | FILE_NOT_FOUND\n")
all_ok = False
stats["fail"] += 1
continue
if args.dry_run:
log(f" → {filename} ({local_path})")
continue
remote_url = upload_file(local_path)
if not remote_url:
log(f" ✗ {filename} (上傳失敗)")
with open(FAILED_LOG, "a") as fh:
fh.write(f"{rel} | {filename} | UPLOAD_FAILED\n")
all_ok = False
stats["fail"] += 1
time.sleep(2)
continue
alt = pathlib.Path(filename).stem
new_content = pattern.sub(f"![{alt}]({remote_url})", content)
if new_content != content:
content = new_content
changed = True
stats["ok"] += 1
log(f" ✓ {filename}")
else:
log(f" ⚠ {filename} 替換未生效(regex 未匹配)")
stats["fail"] += 1
time.sleep(0.3)
if not args.dry_run and changed:
md_file.write_text(content, encoding="utf-8")
verify = md_file.read_text(encoding="utf-8")
remaining = len(WIKILINK_RE.findall(verify))
if remaining == 0:
log(f" 📝 已寫入,wikilink 全部清除 ✓")
with open(DONE_LOG, "a") as fh:
fh.write(rel + "\n")
else:
log(f" 📝 已寫入,仍有 {remaining} 個 wikilink 未處理")
log()
log("=== 完成 ===")
log(f"上傳成功: {stats['ok']}")
log(f"已跳過: {stats['skip']}")
log(f"失敗: {stats['fail']}")
if stats["fail"] > 0:
log(f"失敗紀錄: {FAILED_LOG}")
_log_fh.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment