Skip to content

Instantly share code, notes, and snippets.

@DavidBuchanan314
Created October 23, 2025 01:50
Show Gist options
  • Save DavidBuchanan314/04a08d422f1ed868f95674f526868ed2 to your computer and use it in GitHub Desktop.
Save DavidBuchanan314/04a08d422f1ed868f95674f526868ed2 to your computer and use it in GitHub Desktop.
import sys
from typing import BinaryIO
import requests
import time
from cbrrr import CID, encode_dag_cbor, decode_dag_cbor
from atmst.mst.node_store import NodeStore
from atmst.mst.node_walker import NodeWalker
from atmst.blockstore.car_file import ReadOnlyCARBlockStore, encode_varint
from atmst.blockstore import BlockStore, OverlayBlockStore
class XRPCBlockStore(BlockStore):
def __init__(self, did: str, pds_url: str):
self.did = did
self.pds_url = pds_url
self.session = requests.Session()
self.blocks_read = 0
def put_block(self, key: bytes, value: bytes) -> None:
raise NotImplementedError("XRPCBlockStore is read-only")
def get_block(self, key: bytes) -> bytes:
res = self.session.get(
self.pds_url + "/xrpc/com.atproto.sync.getBlocks",
params={
"did": self.did,
"cids": CID(key).encode()
}
)
res.raise_for_status()
self.blocks_read += 1
return res.content.partition(key)[2] # terrible hack to parse single-block CAR!
def del_block(self, key: bytes) -> None:
raise NotImplementedError("XRPCBlockStore is read-only")
def write_block(file: BinaryIO, data: bytes) -> None:
file.write(encode_varint(len(data)))
file.write(data)
def main():
if len(sys.argv) != 4:
print("Usage: carsync <src_car> <dst_car> <pds_url>", file=sys.stderr)
sys.exit(1)
src_path, dst_path, pds_url = sys.argv[1:]
s = requests.session()
with open(src_path, "rb") as carfile:
bs1 = ReadOnlyCARBlockStore(carfile)
# figure out the DID by inspecting the source CAR
commit = decode_dag_cbor(bs1.get_block(bytes(bs1.car_root)))
assert(isinstance(commit, dict))
did = commit["did"]
assert(isinstance(did, str))
# set up the XRPC-backed overlay block store
bs2 = XRPCBlockStore(did, pds_url)
bs = OverlayBlockStore(bs1, bs2)
# find the *latest* commit
r = s.get(pds_url + "/xrpc/com.atproto.sync.getLatestCommit", params={
"did": did
})
r.raise_for_status()
new_commit_cid = CID.decode(r.json()["cid"])
new_commit = decode_dag_cbor(bs.get_block(bytes(new_commit_cid)))
assert(isinstance(new_commit, dict))
new_root = new_commit["data"]
assert(isinstance(new_root, CID))
# for progress stats
record_count = 0
start_time = time.time()
with open(dst_path, "wb") as carfile_out:
new_header = encode_dag_cbor({
"version": 1,
"roots": [new_commit_cid]
})
write_block(carfile_out, new_header)
write_block(carfile_out, bytes(new_commit_cid) + encode_dag_cbor(new_commit))
for node in NodeWalker(NodeStore(bs), new_root).iter_nodes():
write_block(carfile_out, bytes(node.cid) + node.serialised)
for v in node.vals:
write_block(carfile_out, bytes(v) + bs.get_block(bytes(v)))
record_count += 1
print(f"\rwritten {record_count} records. fetched {bs2.blocks_read} new blocks.", end="")
print(f"\ndone in {time.time() - start_time:.1f} seconds")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment