Last active
November 8, 2024 13:06
-
-
Save adamghill/4225de4c08d03a361ddc4241486b7ac3 to your computer and use it in GitHub Desktop.
Convert a starter pack to a list for Bluesky
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Python port of https://github.com/sbm12/bsky-Pack2List/blob/main/bskyList2StarterPack.php with some fixes / features | |
# Handle is username or email address | |
# Password is the regular login password or a specific app password | |
import json | |
from datetime import datetime | |
import requests | |
class BlueskyApi: | |
def __init__(self, handle=None, app_password=None, api_uri="https://bsky.social/xrpc/"): | |
self.api_uri = api_uri | |
self.account_did = None | |
self.api_key = None | |
if handle and app_password: | |
# Get DID and API key from handle and app password | |
args = {"identifier": handle, "password": app_password} | |
data = self.request("POST", "com.atproto.server.createSession", args) | |
self.account_did = data.get("did") | |
self.api_key = data.get("accessJwt") | |
def get_account_did(self): | |
return self.account_did | |
def set_account_did(self, account_did): | |
self.account_did = account_did | |
def set_api_key(self, api_key): | |
self.api_key = api_key | |
def has_api_key(self): | |
return self.api_key is not None | |
def request(self, method, endpoint, args=None, body=None, content_type=None): | |
url = f"{self.api_uri}{endpoint}" | |
headers = {} | |
if self.api_key: | |
headers["Authorization"] = f"Bearer {self.api_key}" | |
if method == "GET" and args: | |
url += "?" + "&".join([f"{key}={value}" for key, value in args.items()]) | |
if method == "POST" and not content_type: | |
content_type = "application/json" | |
if content_type == "application/json" and args: | |
body = json.dumps(args) | |
if content_type: | |
headers["Content-Type"] = content_type | |
response = requests.request(method, url, headers=headers, json=args, data=body, verify=False) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
response.raise_for_status() | |
def convert_list_to_starter_pack(bluesky, starterpack_at, list_at): | |
# Read the source list for accounts to add to the starter pack list | |
source_list = bluesky.request("GET", "app.bsky.graph.getList", {"list": list_at, "limit": 100}) | |
for list_item in source_list.get("items", []): | |
args = { | |
"collection": "app.bsky.graph.listitem", | |
"repo": bluesky.get_account_did(), | |
"record": { | |
"createdAt": datetime.now().astimezone().replace(microsecond=0).isoformat(), | |
"$type": "app.bsky.graph.listitem", | |
"subject": list_item["subject"]["did"], | |
"list": starterpack_at, | |
}, | |
} | |
bluesky.request("POST", "com.atproto.repo.createRecord", args) | |
def convert_starter_pack_to_list(bluesky, starterpack_at, list_at): | |
# Read the source starter pack to add to the list | |
starter_pack = bluesky.request("GET", "app.bsky.graph.getList", {"list": starterpack_at, "limit": 100}) | |
for pack_item in starter_pack.get("items", []): | |
args = { | |
"collection": "app.bsky.graph.listitem", | |
"repo": bluesky.get_account_did(), | |
"record": { | |
"createdAt": datetime.now().astimezone().replace(microsecond=0).isoformat(), | |
"$type": "app.bsky.graph.listitem", | |
"subject": pack_item["subject"]["did"], | |
"list": list_at, | |
}, | |
} | |
bluesky.request("POST", "com.atproto.repo.createRecord", args) | |
def get_starter_pack_at(bluesky, user_handle, starter_pack_id): | |
data = bluesky.request("GET", "app.bsky.graph.getActorStarterPacks", {"actor": user_handle}) | |
for item in data.get("starterPacks", []): | |
pack_uri = item["uri"] | |
pack_code = pack_uri.split("/")[-1] | |
if pack_code == starter_pack_id: | |
data = bluesky.request("GET", "app.bsky.graph.getStarterPack", {"starterPack": pack_uri}) | |
pack_list_uri = data["starterPack"]["list"]["uri"] | |
return pack_list_uri | |
def get_list_at(bluesky, user_handle, list_id): | |
data = bluesky.request("GET", "app.bsky.graph.getLists", {"actor": user_handle, "limit": 100}) | |
for item in data.get("lists", []): | |
list_uri = item["uri"] | |
list_code = list_uri.split("/")[-1] | |
if list_code == list_id: | |
return list_uri | |
def main(): | |
handle = input("Bluesky email or username: ") | |
app_password = input("Bluesky account or app password: ") | |
pack_url = input("Starter pack url: ") | |
list_url = input("List url: ") | |
bluesky = None | |
try: | |
bluesky = BlueskyApi(handle=handle, app_password=app_password) | |
except: | |
pass | |
if bluesky: | |
pack_parts = pack_url.strip("/").split("/") | |
user_handle = pack_parts[-2] | |
starter_pack_id = pack_parts[-1] | |
list_parts = list_url.strip("/").split("/") | |
list_user_handle = list_parts[-3] | |
list_id = list_parts[-1] | |
starter_pack_at = get_starter_pack_at(bluesky, user_handle, starter_pack_id) | |
list_at = get_list_at(bluesky, list_user_handle, list_id) | |
if starter_pack_at and list_at: | |
convert_starter_pack_to_list(bluesky, starter_pack_at, list_at) | |
print("Import Complete") | |
else: | |
print("Could not find that Starter Pack. Please check the URL and try again.") | |
else: | |
print( | |
"Error connecting to your account. Please check the handle (either a username or email address) and password and try again." | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment