Last active
May 16, 2024 23:14
-
-
Save OutRite/09ae4c231c9be44ffa6010cddcbe76d2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# CLIpclops | |
import requests | |
import json | |
import sys | |
import base64 | |
import getpass | |
pds = 'put the DM endpoint domain here' | |
handle_cache = {} | |
session = requests.Session() | |
real_pds = None | |
def resolve_did(did, session=session): | |
if did in handle_cache: | |
return handle_cache[did] | |
# there doesn't seem to be a lexicon for this, so we can't get the correct handle (ignoring xnbug) of the did according to the appview | |
headers = {'User-Agent': 'SkyBox/0.1 CLIpclops/1.1'} | |
j = None | |
if 'did:web:' in did: | |
# Resolve did:web to handle | |
try: | |
r = session.get(f'https://{did[8:]}/.well-known/did.json', headers=headers) | |
j = r.json() | |
except: | |
j = {} | |
else: | |
# Resolve did:plc to handle | |
r = session.get(f'https://plc.directory/{did}') | |
j = r.json() | |
if j and 'alsoKnownAs' in j and len(j['alsoKnownAs']) > 0 and len(j['alsoKnownAs'][0]) > 5: | |
handle = j['alsoKnownAs'][0][5:] | |
elif j and 'handle' in j: # legacy DID format | |
handle = j['handle'] | |
else: | |
handle = did | |
pds = None | |
if j and 'service' in j: | |
for service in j['service']: | |
if service['type'] == 'AtprotoPersonalDataServer': | |
pds = service['serviceEndpoint'].split('://')[-1] # We're doing [-1] here just in case the endpoint is "pds.example.com" instead of "https://pds.example.com" | |
handle = handle.replace('\n', '').replace(' ', '').replace('\x1b', '') # basic anti-me code. should be replaced with a proper character allowlist | |
if 'did:plc' in did: # attempt to account for samehandle. you can still get around this if you have a 10-char plc collision though. =( | |
handle += f' ({did[4:8+10]})' | |
else: | |
handle += f' ({did[4:]})' | |
handle_cache[did] = (handle, pds) | |
return (handle, pds) | |
def xrpc(id, data, method, pds='bsky.social', session=requests, plain_auth=None): | |
headers = {'User-Agent': 'SkyBox/0.1 CLIpclops/1.1'} | |
if plain_auth is not None: | |
headers['Authorization'] = plain_auth | |
if real_pds is not None and pds != 'bsky.social': | |
proxy_pds = pds | |
headers['atproto-proxy'] = 'did:web:'+proxy_pds+'#bsky_chat' | |
pds = real_pds | |
if method.lower() == 'get': # FIXME: There's probably a nicer way to do this with requests | |
url='https://'+pds+'/xrpc/'+id+'?' | |
for key in data: | |
url += key+'='+str(data[key])+'&' | |
url = url[:-1] | |
r=session.get(url, headers=headers) | |
else: | |
r=session.request(method, 'https://'+pds+'/xrpc/'+id, json=data, headers=headers) | |
return r.text | |
def send_message(source, target, message, embed=None): | |
r=xrpc('chat.bsky.convo.getConvoForMembers', {'members': target}, 'get', pds=pds, plain_auth=source) | |
j=json.loads(r) | |
id = j['convo']['id'] | |
# print(f"convo: {id} {source}->{target} - {message}") | |
if embed: | |
r=xrpc('chat.bsky.convo.sendMessage', {'convoId': id, 'message': {'text': message, 'embed': embed}}, 'post', pds=pds, plain_auth=source) | |
pass | |
else: | |
r=xrpc('chat.bsky.convo.sendMessage', {'convoId': id, 'message': {'text': message}}, 'post', pds=pds, plain_auth=source) | |
pass | |
def get_did(username): | |
j=json.loads(xrpc('com.atproto.identity.resolveHandle', {'handle': username}, 'get')) | |
return j['did'] | |
def menu(login_did): | |
while True: | |
j=xrpc('chat.bsky.convo.listConvos', {}, 'get', pds=pds, plain_auth=login_did) | |
j=json.loads(j) | |
ids = [] | |
if 'convos' not in j: | |
print(j) | |
mx=0 | |
else: | |
mx = len(j['convos']) | |
for convo_i in range(mx): | |
convo=j['convos'][convo_i] | |
ids.append(convo['id']) | |
members=[] | |
for member in convo['members']: | |
members.append(member['handle']) | |
print(f"[{convo_i}] - {', '.join(members)}") | |
print(f"[{mx}] - New chat") | |
print(f"[{mx+1}] - Exit") | |
x = None | |
while x is None: | |
try: | |
x=int(input('>>')) | |
except KeyboardInterrupt: | |
exit() | |
except ValueError: | |
print("err: please select from the options provided.") | |
if x < len(j['convos']): | |
id = ids[x] | |
members = j['convos'][x]['members'] | |
for member in members: | |
if 'Bearer ' in login_did: | |
# To get our DID in authenticated mode, we need to decode the JWT. | |
j=json.loads(base64.urlsafe_b64decode(login_did.split('.')[1]+'====')) # The ==== is a workaround to avoid 'Incorrect padding' | |
if member['did'] != j['sub']: | |
target = member['did'] | |
elif member['did'] != login_did: | |
target = member['did'] | |
conversation(login_did, target, id) | |
elif x == mx: | |
# New chat | |
create_conversation(login_did) | |
elif x == mx+1: | |
# Exit | |
print("Goodbye.") | |
exit() | |
def create_conversation(login_did): | |
target = input("Target: ") | |
if 'did:' not in target: | |
target = get_did(target) | |
j = xrpc('chat.bsky.convo.getConvoForMembers', {'members': target}, 'get', pds=pds, plain_auth=login_did) | |
j = json.loads(j) | |
if 'convo' not in j: | |
if 'error' in j and j['error'] == 'InvalidRequest': | |
print(f"Failed to start the conversation, {j['message']}.") | |
else: | |
print(j) | |
return | |
conversation(login_did, target, j['convo']['id']) | |
def conversation(login_did, target, id): | |
while 1: | |
j=xrpc('chat.bsky.convo.getMessages', {'convoId': id, 'limit': 50}, 'get', pds=pds, plain_auth=login_did) | |
j=json.loads(j) | |
messages=j['messages'][::-1] # this is in reverse, ig so you can get the 50 most recent or so. odd | |
for message in messages: | |
sender, _ = resolve_did(message['sender']['did']) | |
text = message['text'].replace('\x1b', '\x1b[38;5;12m\\x1b\x1b[38;0;m') | |
print(f"[{sender}] {text}") | |
sent=False | |
while not sent: | |
nm=input(f">> ").replace('\\x1b', '\x1b') | |
if nm.startswith('/'): | |
fullcomm = nm[1:].split(' ') | |
command = fullcomm[0] | |
if command == 'help': | |
print('/help - Shows this help message') | |
print('/refresh - Refreshes the chat') | |
print('/send [message] - Sends a message, including ones that start with "/"') | |
print('/quit - Exit the current chat (Alias: /exit)') | |
print('/embed [did] [rkey] - Embeds a post into the chat') | |
elif command == 'refresh': | |
sent=True | |
elif command == 'send': | |
send_message(login_did, target, ' '.join(fullcomm[1:])) | |
sent=True | |
elif command == 'quit' or command == 'exit': | |
return | |
elif command == 'embed': | |
at_uri = f"at://{fullcomm[1]}/app.bsky.feed.post/{fullcomm[2]}" | |
cid='bafyreibidezzodaxz3acl2ovg576u5hixzs7re4qjtfgidiqgbks2plw5i' # random CID since it probably doesn't matter | |
ref = {'$type':'com.atproto.repo.strongRef', 'uri': at_uri, 'cid': cid} | |
embed={'$type': 'app.bsky.embed.record', 'record': ref} | |
send_message(login_did, target, '', embed=embed) | |
else: | |
if len(nm)>0: | |
send_message(login_did, target, nm) | |
sent=True | |
#new_messages, cursor = get_new_messages(login_did, id, cursor) # TODO: actually add this so we don't need to redisplay all messages =) | |
if __name__ == '__main__': | |
print("CLIpclops - Bsky DM client") | |
print("Use /help in any conversation to access the help menu") | |
if len(sys.argv)>1: | |
login_did = sys.argv[1] | |
else: | |
login_did = input("Username/DID: ") | |
if 'did:' not in login_did: | |
# Handle? | |
login_did = get_did(login_did) | |
# Check if the API supports did-only login | |
r = xrpc('chat.bsky.convo.listConvos', {}, 'get', pds=pds, plain_auth=login_did) | |
j = json.loads(r) | |
if 'error' in j and j['error'] == 'AuthenticationRequired': | |
# An actual login is required to use this API | |
print("This service requires you to login.") | |
password = getpass.getpass("Enter your app password: ") | |
# First, we need to get the PDS of the user. | |
_, user_pds = resolve_did(login_did) | |
if user_pds is None: | |
print("This account doesn't seem to have a PDS attached. Did you type your DID or handle correctly?") | |
exit() | |
# Next, we need to login with the password. | |
r = xrpc('com.atproto.server.createSession', {'identifier': login_did, 'password': password}, 'post', pds=user_pds) | |
j = json.loads(r) | |
if 'accessJwt' not in j: | |
# Login failed. Incorrect password? | |
print("Failed to login. Please double-check your username and password.") | |
exit() | |
token = j['accessJwt'] | |
login_did = 'Bearer '+token | |
real_pds = user_pds | |
menu(login_did) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment