Created
October 23, 2025 01:50
-
-
Save DavidBuchanan314/04a08d422f1ed868f95674f526868ed2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys | |
| from typing import BinaryIO | |
| import requests | |
| import time | |
| from cbrrr import CID, encode_dag_cbor, decode_dag_cbor | |
| from atmst.mst.node_store import NodeStore | |
| from atmst.mst.node_walker import NodeWalker | |
| from atmst.blockstore.car_file import ReadOnlyCARBlockStore, encode_varint | |
| from atmst.blockstore import BlockStore, OverlayBlockStore | |
| class XRPCBlockStore(BlockStore): | |
| def __init__(self, did: str, pds_url: str): | |
| self.did = did | |
| self.pds_url = pds_url | |
| self.session = requests.Session() | |
| self.blocks_read = 0 | |
| def put_block(self, key: bytes, value: bytes) -> None: | |
| raise NotImplementedError("XRPCBlockStore is read-only") | |
| def get_block(self, key: bytes) -> bytes: | |
| res = self.session.get( | |
| self.pds_url + "/xrpc/com.atproto.sync.getBlocks", | |
| params={ | |
| "did": self.did, | |
| "cids": CID(key).encode() | |
| } | |
| ) | |
| res.raise_for_status() | |
| self.blocks_read += 1 | |
| return res.content.partition(key)[2] # terrible hack to parse single-block CAR! | |
| def del_block(self, key: bytes) -> None: | |
| raise NotImplementedError("XRPCBlockStore is read-only") | |
| def write_block(file: BinaryIO, data: bytes) -> None: | |
| file.write(encode_varint(len(data))) | |
| file.write(data) | |
| def main(): | |
| if len(sys.argv) != 4: | |
| print("Usage: carsync <src_car> <dst_car> <pds_url>", file=sys.stderr) | |
| sys.exit(1) | |
| src_path, dst_path, pds_url = sys.argv[1:] | |
| s = requests.session() | |
| with open(src_path, "rb") as carfile: | |
| bs1 = ReadOnlyCARBlockStore(carfile) | |
| # figure out the DID by inspecting the source CAR | |
| commit = decode_dag_cbor(bs1.get_block(bytes(bs1.car_root))) | |
| assert(isinstance(commit, dict)) | |
| did = commit["did"] | |
| assert(isinstance(did, str)) | |
| # set up the XRPC-backed overlay block store | |
| bs2 = XRPCBlockStore(did, pds_url) | |
| bs = OverlayBlockStore(bs1, bs2) | |
| # find the *latest* commit | |
| r = s.get(pds_url + "/xrpc/com.atproto.sync.getLatestCommit", params={ | |
| "did": did | |
| }) | |
| r.raise_for_status() | |
| new_commit_cid = CID.decode(r.json()["cid"]) | |
| new_commit = decode_dag_cbor(bs.get_block(bytes(new_commit_cid))) | |
| assert(isinstance(new_commit, dict)) | |
| new_root = new_commit["data"] | |
| assert(isinstance(new_root, CID)) | |
| # for progress stats | |
| record_count = 0 | |
| start_time = time.time() | |
| with open(dst_path, "wb") as carfile_out: | |
| new_header = encode_dag_cbor({ | |
| "version": 1, | |
| "roots": [new_commit_cid] | |
| }) | |
| write_block(carfile_out, new_header) | |
| write_block(carfile_out, bytes(new_commit_cid) + encode_dag_cbor(new_commit)) | |
| for node in NodeWalker(NodeStore(bs), new_root).iter_nodes(): | |
| write_block(carfile_out, bytes(node.cid) + node.serialised) | |
| for v in node.vals: | |
| write_block(carfile_out, bytes(v) + bs.get_block(bytes(v))) | |
| record_count += 1 | |
| print(f"\rwritten {record_count} records. fetched {bs2.blocks_read} new blocks.", end="") | |
| print(f"\ndone in {time.time() - start_time:.1f} seconds") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment