Skip to content

Instantly share code, notes, and snippets.

@erikh
Created June 23, 2026 22:04
Show Gist options
  • Select an option

  • Save erikh/6217bee7c59eaf064802e6a73157eac9 to your computer and use it in GitHub Desktop.

Select an option

Save erikh/6217bee7c59eaf064802e6a73157eac9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""Copy Bluesky posts -> Mastodon, skipping ones whose text already exists on Mastodon.
Resumable: records posted Bluesky URIs in STATE_FILE. Honors Mastodon rate limits."""
import os, sys, json, re, time, html, io
import requests
INSTANCE = "https://mastodon.social"
BSKY_FILE = "/tmp/bsky_posts.json"
STATE_FILE = "/tmp/bsky_sync_state.json"
DELAY = float(os.environ.get("SYNC_DELAY", "6")) # seconds between posts ("slowly")
DRY_RUN = os.environ.get("DRY_RUN") == "1"
def get_token():
t = os.environ.get("MASTODON_TOKEN")
if not t:
p = os.path.expanduser("~/.config/bsky-sync/token")
if os.path.exists(p):
t = open(p).read().strip()
if not t:
sys.exit("ERROR: no token. Set $MASTODON_TOKEN or write ~/.config/bsky-sync/token")
return t
TOKEN = get_token()
H = {"Authorization": f"Bearer {TOKEN}", "User-Agent": "bsky-sync/1.0"}
def norm(text):
"""Normalize text for dedup: strip tags already done; collapse ws, lowercase, drop urls."""
text = re.sub(r"https?://\S+", "", text)
text = re.sub(r"\s+", " ", text).strip().lower()
return text
def strip_html(s):
s = re.sub(r"<br\s*/?>", "\n", s, flags=re.I)
s = re.sub(r"</p>", "\n\n", s, flags=re.I)
s = re.sub(r"<[^>]+>", "", s)
return html.unescape(s).strip()
def rl_wait(resp):
"""If rate-limited, sleep until reset. Returns True if caller should retry."""
if resp.status_code == 429:
reset = resp.headers.get("X-RateLimit-Reset")
wait = 60
if reset:
try:
# reset is ISO8601; fall back to 60s
import datetime
dt = datetime.datetime.fromisoformat(reset.replace("Z", "+00:00"))
wait = max(5, (dt - datetime.datetime.now(datetime.timezone.utc)).total_seconds() + 2)
except Exception:
wait = 60
print(f" rate-limited; sleeping {int(wait)}s", flush=True)
time.sleep(wait)
return True
return False
def verify():
r = requests.get(f"{INSTANCE}/api/v1/accounts/verify_credentials", headers=H, timeout=30)
r.raise_for_status()
return r.json()
def fetch_existing(account_id):
"""Fetch all of the account's own statuses, return set of normalized texts."""
texts = set()
max_id = None
while True:
params = {"limit": 40, "exclude_reblogs": "true", "exclude_replies": "false"}
if max_id: params["max_id"] = max_id
r = requests.get(f"{INSTANCE}/api/v1/accounts/{account_id}/statuses",
headers=H, params=params, timeout=30)
if rl_wait(r): continue
r.raise_for_status()
batch = r.json()
if not batch: break
for st in batch:
texts.add(norm(strip_html(st.get("content", ""))))
max_id = st["id"]
print(f" fetched {len(texts)} existing Mastodon posts...", flush=True)
texts.discard("")
return texts
def upload_media(img):
data = requests.get(img["url"], timeout=60,
headers={"User-Agent": "bsky-sync/1.0"}).content
files = {"file": ("image.jpg", io.BytesIO(data), "image/jpeg")}
payload = {}
if img.get("alt"): payload["description"] = img["alt"][:1480]
while True:
r = requests.post(f"{INSTANCE}/api/v2/media", headers=H, files=files,
data=payload, timeout=120)
if rl_wait(r):
files = {"file": ("image.jpg", io.BytesIO(data), "image/jpeg")}
continue
r.raise_for_status()
mid = r.json()["id"]
# 202 => processing; poll until ready
if r.status_code == 202:
for _ in range(30):
time.sleep(2)
rr = requests.get(f"{INSTANCE}/api/v1/media/{mid}", headers=H, timeout=30)
if rr.status_code == 200:
break
return mid
def post_status(text, media_ids):
data = {"status": text, "visibility": "public"}
if media_ids:
data["media_ids[]"] = media_ids
while True:
r = requests.post(f"{INSTANCE}/api/v1/statuses", headers=H, data=data, timeout=60)
if rl_wait(r): continue
r.raise_for_status()
return r.json()
def main():
posts = json.load(open(BSKY_FILE))
state = {"posted": []}
if os.path.exists(STATE_FILE):
state = json.load(open(STATE_FILE))
posted = set(state["posted"])
acct = verify()
print(f"Authenticated as @{acct['username']} (id {acct['id']})", flush=True)
print("Fetching existing Mastodon posts for dedup...", flush=True)
existing = fetch_existing(acct["id"])
print(f"Found {len(existing)} existing Mastodon posts to dedup against.\n", flush=True)
to_post = []
skipped_dup = 0
for p in posts:
if p["uri"] in posted:
continue
if norm(p["text"]) and norm(p["text"]) in existing:
skipped_dup += 1
posted.add(p["uri"]) # mark so we don't recheck
continue
if not p["text"].strip() and not p["images"]:
continue
to_post.append(p)
print(f"Plan: {len(to_post)} to post, {skipped_dup} skipped as duplicates, "
f"{len(posted)-skipped_dup} already done.\n", flush=True)
if DRY_RUN:
print("DRY_RUN=1 -> not posting. Sample of what would post:")
for p in to_post[:10]:
print(f" [{p['createdAt'][:10]}] imgs={len(p['images'])} {p['text'][:60]!r}")
return
done = 0
for p in to_post:
try:
media_ids = [upload_media(im) for im in p["images"][:4]] # Mastodon max 4
text = p["text"]
if not text.strip() and not media_ids:
continue
post_status(text, media_ids)
posted.add(p["uri"])
done += 1
if done % 10 == 0 or done == 1:
json.dump({"posted": sorted(posted)}, open(STATE_FILE, "w"))
print(f" posted {done}/{len(to_post)} [{p['createdAt'][:10]}] "
f"{p['text'][:50]!r}", flush=True)
time.sleep(DELAY)
except Exception as e:
json.dump({"posted": sorted(posted)}, open(STATE_FILE, "w"))
print(f" ERROR on {p['uri']}: {e} -- state saved, continuing", flush=True)
time.sleep(DELAY)
json.dump({"posted": sorted(posted)}, open(STATE_FILE, "w"))
print(f"\nDONE. Posted {done} new posts. State in {STATE_FILE}", flush=True)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment