Created
June 23, 2026 22:04
-
-
Save erikh/6217bee7c59eaf064802e6a73157eac9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Copy Bluesky posts -> Mastodon, skipping ones whose text already exists on Mastodon. | |
| Resumable: records posted Bluesky URIs in STATE_FILE. Honors Mastodon rate limits.""" | |
| import os, sys, json, re, time, html, io | |
| import requests | |
| INSTANCE = "https://mastodon.social" | |
| BSKY_FILE = "/tmp/bsky_posts.json" | |
| STATE_FILE = "/tmp/bsky_sync_state.json" | |
| DELAY = float(os.environ.get("SYNC_DELAY", "6")) # seconds between posts ("slowly") | |
| DRY_RUN = os.environ.get("DRY_RUN") == "1" | |
| def get_token(): | |
| t = os.environ.get("MASTODON_TOKEN") | |
| if not t: | |
| p = os.path.expanduser("~/.config/bsky-sync/token") | |
| if os.path.exists(p): | |
| t = open(p).read().strip() | |
| if not t: | |
| sys.exit("ERROR: no token. Set $MASTODON_TOKEN or write ~/.config/bsky-sync/token") | |
| return t | |
| TOKEN = get_token() | |
| H = {"Authorization": f"Bearer {TOKEN}", "User-Agent": "bsky-sync/1.0"} | |
| def norm(text): | |
| """Normalize text for dedup: strip tags already done; collapse ws, lowercase, drop urls.""" | |
| text = re.sub(r"https?://\S+", "", text) | |
| text = re.sub(r"\s+", " ", text).strip().lower() | |
| return text | |
| def strip_html(s): | |
| s = re.sub(r"<br\s*/?>", "\n", s, flags=re.I) | |
| s = re.sub(r"</p>", "\n\n", s, flags=re.I) | |
| s = re.sub(r"<[^>]+>", "", s) | |
| return html.unescape(s).strip() | |
| def rl_wait(resp): | |
| """If rate-limited, sleep until reset. Returns True if caller should retry.""" | |
| if resp.status_code == 429: | |
| reset = resp.headers.get("X-RateLimit-Reset") | |
| wait = 60 | |
| if reset: | |
| try: | |
| # reset is ISO8601; fall back to 60s | |
| import datetime | |
| dt = datetime.datetime.fromisoformat(reset.replace("Z", "+00:00")) | |
| wait = max(5, (dt - datetime.datetime.now(datetime.timezone.utc)).total_seconds() + 2) | |
| except Exception: | |
| wait = 60 | |
| print(f" rate-limited; sleeping {int(wait)}s", flush=True) | |
| time.sleep(wait) | |
| return True | |
| return False | |
| def verify(): | |
| r = requests.get(f"{INSTANCE}/api/v1/accounts/verify_credentials", headers=H, timeout=30) | |
| r.raise_for_status() | |
| return r.json() | |
| def fetch_existing(account_id): | |
| """Fetch all of the account's own statuses, return set of normalized texts.""" | |
| texts = set() | |
| max_id = None | |
| while True: | |
| params = {"limit": 40, "exclude_reblogs": "true", "exclude_replies": "false"} | |
| if max_id: params["max_id"] = max_id | |
| r = requests.get(f"{INSTANCE}/api/v1/accounts/{account_id}/statuses", | |
| headers=H, params=params, timeout=30) | |
| if rl_wait(r): continue | |
| r.raise_for_status() | |
| batch = r.json() | |
| if not batch: break | |
| for st in batch: | |
| texts.add(norm(strip_html(st.get("content", "")))) | |
| max_id = st["id"] | |
| print(f" fetched {len(texts)} existing Mastodon posts...", flush=True) | |
| texts.discard("") | |
| return texts | |
| def upload_media(img): | |
| data = requests.get(img["url"], timeout=60, | |
| headers={"User-Agent": "bsky-sync/1.0"}).content | |
| files = {"file": ("image.jpg", io.BytesIO(data), "image/jpeg")} | |
| payload = {} | |
| if img.get("alt"): payload["description"] = img["alt"][:1480] | |
| while True: | |
| r = requests.post(f"{INSTANCE}/api/v2/media", headers=H, files=files, | |
| data=payload, timeout=120) | |
| if rl_wait(r): | |
| files = {"file": ("image.jpg", io.BytesIO(data), "image/jpeg")} | |
| continue | |
| r.raise_for_status() | |
| mid = r.json()["id"] | |
| # 202 => processing; poll until ready | |
| if r.status_code == 202: | |
| for _ in range(30): | |
| time.sleep(2) | |
| rr = requests.get(f"{INSTANCE}/api/v1/media/{mid}", headers=H, timeout=30) | |
| if rr.status_code == 200: | |
| break | |
| return mid | |
| def post_status(text, media_ids): | |
| data = {"status": text, "visibility": "public"} | |
| if media_ids: | |
| data["media_ids[]"] = media_ids | |
| while True: | |
| r = requests.post(f"{INSTANCE}/api/v1/statuses", headers=H, data=data, timeout=60) | |
| if rl_wait(r): continue | |
| r.raise_for_status() | |
| return r.json() | |
| def main(): | |
| posts = json.load(open(BSKY_FILE)) | |
| state = {"posted": []} | |
| if os.path.exists(STATE_FILE): | |
| state = json.load(open(STATE_FILE)) | |
| posted = set(state["posted"]) | |
| acct = verify() | |
| print(f"Authenticated as @{acct['username']} (id {acct['id']})", flush=True) | |
| print("Fetching existing Mastodon posts for dedup...", flush=True) | |
| existing = fetch_existing(acct["id"]) | |
| print(f"Found {len(existing)} existing Mastodon posts to dedup against.\n", flush=True) | |
| to_post = [] | |
| skipped_dup = 0 | |
| for p in posts: | |
| if p["uri"] in posted: | |
| continue | |
| if norm(p["text"]) and norm(p["text"]) in existing: | |
| skipped_dup += 1 | |
| posted.add(p["uri"]) # mark so we don't recheck | |
| continue | |
| if not p["text"].strip() and not p["images"]: | |
| continue | |
| to_post.append(p) | |
| print(f"Plan: {len(to_post)} to post, {skipped_dup} skipped as duplicates, " | |
| f"{len(posted)-skipped_dup} already done.\n", flush=True) | |
| if DRY_RUN: | |
| print("DRY_RUN=1 -> not posting. Sample of what would post:") | |
| for p in to_post[:10]: | |
| print(f" [{p['createdAt'][:10]}] imgs={len(p['images'])} {p['text'][:60]!r}") | |
| return | |
| done = 0 | |
| for p in to_post: | |
| try: | |
| media_ids = [upload_media(im) for im in p["images"][:4]] # Mastodon max 4 | |
| text = p["text"] | |
| if not text.strip() and not media_ids: | |
| continue | |
| post_status(text, media_ids) | |
| posted.add(p["uri"]) | |
| done += 1 | |
| if done % 10 == 0 or done == 1: | |
| json.dump({"posted": sorted(posted)}, open(STATE_FILE, "w")) | |
| print(f" posted {done}/{len(to_post)} [{p['createdAt'][:10]}] " | |
| f"{p['text'][:50]!r}", flush=True) | |
| time.sleep(DELAY) | |
| except Exception as e: | |
| json.dump({"posted": sorted(posted)}, open(STATE_FILE, "w")) | |
| print(f" ERROR on {p['uri']}: {e} -- state saved, continuing", flush=True) | |
| time.sleep(DELAY) | |
| json.dump({"posted": sorted(posted)}, open(STATE_FILE, "w")) | |
| print(f"\nDONE. Posted {done} new posts. State in {STATE_FILE}", flush=True) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment