|
#!/usr/bin/env python3 |
|
"""Extract Slack Desktop credentials and store them in macOS Keychain. |
|
|
|
Reads xoxc tokens from Slack Desktop's LevelDB and decrypts the xoxd |
|
cookie from its SQLite Cookies DB. Stores results in Keychain under the |
|
service name "slack-listen" so slack-listen.py can use them. |
|
|
|
No external dependencies beyond Python stdlib + cryptography (pre-installed). |
|
|
|
Usage: |
|
slack-auth.py # extract and store in keychain |
|
slack-auth.py --check # verify stored credentials still work |
|
slack-auth.py --show # print stored credentials (redacted) |
|
|
|
Inspired by: github.com/stablyai/agent-slack, github.com/hraftery/slacktokens |
|
""" |
|
|
|
import argparse |
|
import hashlib |
|
import json |
|
import os |
|
import re |
|
import shutil |
|
import sqlite3 |
|
import struct |
|
import subprocess |
|
import sys |
|
import tempfile |
|
import urllib.parse |
|
import urllib.request |
|
|
|
KEYCHAIN_SERVICE = "slack-listen" |
|
SLACK_DIR = os.path.expanduser("~/Library/Application Support/Slack") |
|
SLACK_DIR_APPSTORE = os.path.expanduser( |
|
"~/Library/Containers/com.tinyspeck.slackmacgap/Data/Library/Application Support/Slack" |
|
) |
|
|
|
|
|
# --- Keychain helpers --- |
|
|
|
def keychain_set(account, value): |
|
"""Store a value in macOS Keychain.""" |
|
# Delete existing entry first (ignore errors if it doesn't exist) |
|
subprocess.run( |
|
["security", "delete-generic-password", "-s", KEYCHAIN_SERVICE, "-a", account], |
|
capture_output=True, |
|
) |
|
subprocess.check_call( |
|
["security", "add-generic-password", "-s", KEYCHAIN_SERVICE, "-a", account, "-w", value], |
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, |
|
) |
|
|
|
|
|
def keychain_get(account): |
|
"""Read a value from macOS Keychain.""" |
|
try: |
|
return subprocess.check_output( |
|
["security", "find-generic-password", "-s", KEYCHAIN_SERVICE, "-a", account, "-w"], |
|
stderr=subprocess.DEVNULL, |
|
).decode().strip() |
|
except subprocess.CalledProcessError: |
|
return None |
|
|
|
|
|
def get_safe_storage_password(): |
|
"""Get Slack's Safe Storage password from Keychain.""" |
|
for service, account in [ |
|
("Slack Safe Storage", "Slack Key"), |
|
("Slack Safe Storage", "Slack App Store Key"), |
|
("Slack Safe Storage", None), |
|
]: |
|
try: |
|
args = ["security", "find-generic-password", "-w", "-s", service] |
|
if account: |
|
args.extend(["-a", account]) |
|
return subprocess.check_output(args, stderr=subprocess.DEVNULL).decode().strip() |
|
except subprocess.CalledProcessError: |
|
continue |
|
return None |
|
|
|
|
|
# --- LevelDB log file reader (no compression needed) --- |
|
|
|
def read_log_records(data): |
|
"""Parse LevelDB .log file records. Yields raw record payloads.""" |
|
BLOCK_SIZE = 32768 |
|
offset = 0 |
|
pending = [] |
|
|
|
while offset < len(data): |
|
block_offset = offset % BLOCK_SIZE |
|
remaining = BLOCK_SIZE - block_offset |
|
|
|
if remaining < 7: |
|
offset += remaining |
|
continue |
|
|
|
if offset + 7 > len(data): |
|
break |
|
|
|
length = struct.unpack_from("<H", data, offset + 4)[0] |
|
rtype = data[offset + 6] |
|
offset += 7 |
|
|
|
if length == 0 or offset + length > len(data): |
|
offset += max(0, remaining - 7) |
|
pending = [] |
|
continue |
|
|
|
chunk = data[offset:offset + length] |
|
offset += length |
|
|
|
if rtype == 1: # FULL |
|
pending = [] |
|
yield chunk |
|
elif rtype == 2: # FIRST |
|
pending = [chunk] |
|
elif rtype == 3: # MIDDLE |
|
if pending: |
|
pending.append(chunk) |
|
elif rtype == 4: # LAST |
|
if pending: |
|
pending.append(chunk) |
|
yield b"".join(pending) |
|
pending = [] |
|
|
|
|
|
def parse_log_batch(batch): |
|
"""Parse a LevelDB write batch. Yields (key, value) tuples.""" |
|
if len(batch) < 12: |
|
return |
|
offset = 12 # skip sequence(8) + count(4) |
|
|
|
while offset < len(batch): |
|
record_type = batch[offset] |
|
offset += 1 |
|
|
|
if record_type == 1: # Value record |
|
key_len, n = read_varint(batch, offset) |
|
offset += n |
|
key = batch[offset:offset + key_len] |
|
offset += key_len |
|
val_len, n = read_varint(batch, offset) |
|
offset += n |
|
value = batch[offset:offset + val_len] |
|
offset += val_len |
|
yield key, value |
|
elif record_type == 0: # Deletion |
|
key_len, n = read_varint(batch, offset) |
|
offset += n |
|
offset += key_len |
|
else: |
|
break |
|
|
|
|
|
def read_varint(buf, offset): |
|
"""Read a LevelDB varint.""" |
|
result = 0 |
|
shift = 0 |
|
n = 0 |
|
while offset + n < len(buf): |
|
b = buf[offset + n] |
|
n += 1 |
|
result |= (b & 0x7F) << shift |
|
if (b & 0x80) == 0: |
|
return result, n |
|
shift += 7 |
|
if shift >= 35: |
|
break |
|
raise ValueError("bad varint") |
|
|
|
|
|
def extract_teams_from_leveldb(slack_dir): |
|
"""Extract xoxc team tokens from Slack's LevelDB.""" |
|
leveldb_dir = os.path.join(slack_dir, "Local Storage", "leveldb") |
|
if not os.path.isdir(leveldb_dir): |
|
return [] |
|
|
|
# Snapshot the LevelDB dir (Slack holds a lock on it) |
|
snap = tempfile.mkdtemp(prefix="slack-leveldb-") |
|
try: |
|
subprocess.run(["cp", "-cR", leveldb_dir, snap + "/db"], capture_output=True) |
|
lock = os.path.join(snap, "db", "LOCK") |
|
if os.path.exists(lock): |
|
os.unlink(lock) |
|
db_dir = snap + "/db" |
|
except Exception: |
|
shutil.copytree(leveldb_dir, snap + "/db") |
|
db_dir = snap + "/db" |
|
|
|
try: |
|
config_buf = None |
|
|
|
# Scan .log files first (most recent data, no compression) |
|
for fname in sorted(os.listdir(db_dir), reverse=True): |
|
if not fname.endswith(".log"): |
|
continue |
|
with open(os.path.join(db_dir, fname), "rb") as f: |
|
data = f.read() |
|
for record in read_log_records(data): |
|
for key, value in parse_log_batch(record): |
|
if b"localConfig_v" in key: |
|
config_buf = value |
|
break |
|
if config_buf: |
|
break |
|
if config_buf: |
|
break |
|
|
|
# If not in logs, scan all .ldb files containing localConfig |
|
if not config_buf: |
|
ldb_bufs = [] |
|
for fname in sorted(os.listdir(db_dir)): |
|
if not fname.endswith(".ldb"): |
|
continue |
|
with open(os.path.join(db_dir, fname), "rb") as f: |
|
raw = f.read() |
|
if b"localConfig_v" in raw: |
|
ldb_bufs.append(raw) |
|
# Concatenate all matching files for regex extraction |
|
if ldb_bufs: |
|
config_buf = b"\x00".join(ldb_bufs) |
|
|
|
if not config_buf: |
|
return [] |
|
|
|
# LevelDB uses prefix compression which corrupts JSON structure. |
|
# Use regex to extract team data from the raw bytes. |
|
tokens = re.findall(rb'"token"\s*:\s*"(xoxc-[^"]+)"', config_buf) |
|
urls = re.findall(rb'"url"\s*:\s*"(https://[^"]+slack\.com/[^"]*)"', config_buf) |
|
names = re.findall(rb'"name"\s*:\s*"([^"]{1,100})"', config_buf) |
|
|
|
if not tokens: |
|
return [] |
|
|
|
teams = [] |
|
seen_tokens = set() |
|
for token in tokens: |
|
tok = token.decode("utf-8", errors="ignore") |
|
if tok in seen_tokens: |
|
continue |
|
seen_tokens.add(tok) |
|
# Match this token to the nearest url and name before it |
|
tok_pos = config_buf.find(token) |
|
best_url = "" |
|
best_name = "" |
|
for url in urls: |
|
pos = config_buf.rfind(url, 0, tok_pos) |
|
if pos != -1: |
|
best_url = url.decode("utf-8", errors="ignore").rstrip("/") |
|
break |
|
for name in names: |
|
pos = config_buf.rfind(name, 0, tok_pos) |
|
if pos != -1: |
|
best_name = name.decode("utf-8", errors="ignore") |
|
break |
|
teams.append({"url": best_url, "name": best_name, "token": tok}) |
|
return teams |
|
|
|
finally: |
|
shutil.rmtree(snap, ignore_errors=True) |
|
|
|
|
|
# --- Cookie decryption --- |
|
|
|
def decrypt_cookie(encrypted_value, password): |
|
"""Decrypt a Chromium cookie using AES-128-CBC (macOS).""" |
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes |
|
|
|
# Skip v10/v11 prefix |
|
prefix = encrypted_value[:3] |
|
if prefix in (b"v10", b"v11"): |
|
data = encrypted_value[3:] |
|
else: |
|
data = encrypted_value |
|
|
|
salt = b"saltysalt" |
|
iv = b" " * 16 |
|
key = hashlib.pbkdf2_hmac("sha1", password.encode(), salt, 1003, dklen=16) |
|
|
|
cipher = Cipher(algorithms.AES(key), modes.CBC(iv)) |
|
decryptor = cipher.decryptor() |
|
decrypted = decryptor.update(data) + decryptor.finalize() |
|
|
|
# Find xoxd- token in decrypted bytes |
|
match = re.search(rb"xoxd-[A-Za-z0-9%/+_=.-]+", decrypted) |
|
if match: |
|
raw = match.group(0).decode("utf-8") |
|
try: |
|
return urllib.parse.unquote(raw) |
|
except Exception: |
|
return raw |
|
return None |
|
|
|
|
|
def extract_cookie_d(slack_dir): |
|
"""Extract and decrypt the xoxd cookie from Slack's Cookies DB.""" |
|
# Find the cookies DB |
|
for candidate in [ |
|
os.path.join(slack_dir, "Network", "Cookies"), |
|
os.path.join(slack_dir, "Cookies"), |
|
]: |
|
if os.path.exists(candidate): |
|
cookies_path = candidate |
|
break |
|
else: |
|
return None |
|
|
|
password = get_safe_storage_password() |
|
if not password: |
|
print('{"error": "could not read Slack Safe Storage password from Keychain"}', file=sys.stderr) |
|
return None |
|
|
|
# Copy the DB to avoid locking issues |
|
tmp = tempfile.mktemp(suffix=".db", prefix="slack-cookies-") |
|
shutil.copy2(cookies_path, tmp) |
|
|
|
try: |
|
conn = sqlite3.connect(tmp) |
|
conn.row_factory = sqlite3.Row |
|
rows = conn.execute( |
|
"SELECT host_key, name, value, encrypted_value FROM cookies " |
|
"WHERE name = 'd' AND host_key LIKE '%slack.com' " |
|
"ORDER BY length(encrypted_value) DESC" |
|
).fetchall() |
|
conn.close() |
|
|
|
if not rows: |
|
return None |
|
|
|
row = rows[0] |
|
# Check for unencrypted value first |
|
if row["value"] and row["value"].startswith("xoxd-"): |
|
return row["value"] |
|
|
|
encrypted = bytes(row["encrypted_value"]) |
|
if not encrypted: |
|
return None |
|
|
|
return decrypt_cookie(encrypted, password) |
|
|
|
finally: |
|
os.unlink(tmp) |
|
|
|
|
|
# --- Main --- |
|
|
|
def resolve_team_url(xoxc, xoxd): |
|
"""Call auth.test to get the actual workspace URL for a token.""" |
|
xoxd_enc = urllib.parse.quote(xoxd, safe="") |
|
try: |
|
req = urllib.request.Request( |
|
"https://slack.com/api/auth.test", |
|
data=urllib.parse.urlencode({"token": xoxc}).encode(), |
|
headers={"Cookie": f"d={xoxd_enc}"}, |
|
method="POST", |
|
) |
|
resp = json.loads(urllib.request.urlopen(req, timeout=10).read()) |
|
if resp.get("ok"): |
|
return { |
|
"url": resp.get("url", "").rstrip("/"), |
|
"team": resp.get("team", ""), |
|
"user": resp.get("user", ""), |
|
"team_id": resp.get("team_id", ""), |
|
} |
|
except Exception: |
|
pass |
|
return None |
|
|
|
|
|
def extract_all(): |
|
"""Extract credentials from Slack Desktop and store in Keychain.""" |
|
# Try both Electron and App Store paths |
|
for slack_dir in [SLACK_DIR, SLACK_DIR_APPSTORE]: |
|
if not os.path.isdir(slack_dir): |
|
continue |
|
|
|
teams = extract_teams_from_leveldb(slack_dir) |
|
cookie_d = extract_cookie_d(slack_dir) |
|
|
|
if teams and cookie_d: |
|
# Resolve actual workspace URLs via auth.test |
|
resolved = [] |
|
seen_urls = set() |
|
for team in teams: |
|
info = resolve_team_url(team["token"], cookie_d) |
|
if info and info["url"] not in seen_urls: |
|
seen_urls.add(info["url"]) |
|
resolved.append({ |
|
"url": info["url"], |
|
"name": info["team"], |
|
"user": info["user"], |
|
"token": team["token"], |
|
}) |
|
|
|
if not resolved: |
|
# Fall back to LevelDB URLs if auth.test fails |
|
resolved = teams |
|
|
|
# Store in keychain |
|
keychain_set("xoxd", cookie_d) |
|
for team in resolved: |
|
keychain_set(f"xoxc:{team['url']}", team["token"]) |
|
|
|
result = { |
|
"ok": True, |
|
"cookie_d": f"xoxd-...{cookie_d[-6:]}", |
|
"teams": [{ |
|
"url": t["url"], |
|
"name": t.get("name", ""), |
|
"user": t.get("user", ""), |
|
"token": f"xoxc-...{t['token'][-6:]}", |
|
} for t in resolved], |
|
"keychain_service": KEYCHAIN_SERVICE, |
|
} |
|
print(json.dumps(result)) |
|
return True |
|
|
|
print(json.dumps({"ok": False, "error": "could not extract credentials from Slack Desktop"}), file=sys.stderr) |
|
return False |
|
|
|
|
|
def find_stored_workspaces(): |
|
"""Find workspace URLs stored under our keychain service.""" |
|
dump = subprocess.check_output(["security", "dump-keychain"], stderr=subprocess.DEVNULL).decode() |
|
urls = [] |
|
lines = dump.splitlines() |
|
in_our_service = False |
|
for line in lines: |
|
if f'"svce"<blob>="{KEYCHAIN_SERVICE}"' in line: |
|
in_our_service = True |
|
elif '"svce"<blob>=' in line: |
|
in_our_service = False |
|
if in_our_service and '"acct"' in line and "xoxc:" in line: |
|
url = line.split("xoxc:", 1)[1].strip().strip('"') |
|
if url not in urls: |
|
urls.append(url) |
|
in_our_service = False |
|
return urls |
|
|
|
|
|
def check_credentials(): |
|
"""Verify stored Keychain credentials work against Slack API.""" |
|
xoxd = keychain_get("xoxd") |
|
if not xoxd: |
|
print(json.dumps({"ok": False, "error": "no xoxd in keychain"})) |
|
return False |
|
|
|
workspaces = find_stored_workspaces() |
|
if not workspaces: |
|
print(json.dumps({"ok": False, "error": "no xoxc entries in keychain"})) |
|
return False |
|
|
|
results = [] |
|
xoxd_enc = urllib.parse.quote(xoxd, safe="") |
|
for url in workspaces: |
|
xoxc = keychain_get(f"xoxc:{url}") |
|
if not xoxc: |
|
results.append({"url": url, "ok": False, "error": "token not found in keychain"}) |
|
continue |
|
try: |
|
req = urllib.request.Request( |
|
f"{url}/api/auth.test", |
|
data=urllib.parse.urlencode({"token": xoxc}).encode(), |
|
headers={"Cookie": f"d={xoxd_enc}"}, |
|
method="POST", |
|
) |
|
resp = json.loads(urllib.request.urlopen(req, timeout=10).read()) |
|
results.append({"url": url, "ok": resp.get("ok", False), "user": resp.get("user"), "team": resp.get("team")}) |
|
except Exception as e: |
|
results.append({"url": url, "ok": False, "error": str(e)}) |
|
|
|
print(json.dumps({"ok": all(r["ok"] for r in results), "workspaces": results})) |
|
return all(r["ok"] for r in results) |
|
|
|
|
|
def show_credentials(): |
|
"""Show stored credentials (redacted).""" |
|
xoxd = keychain_get("xoxd") |
|
dump = subprocess.check_output(["security", "dump-keychain"], stderr=subprocess.DEVNULL).decode() |
|
|
|
teams = [] |
|
for line in dump.splitlines(): |
|
if '"acct"' in line and "xoxc:" in line: |
|
url = line.split("xoxc:", 1)[1].strip().strip('"') |
|
xoxc = keychain_get(f"xoxc:{url}") |
|
if xoxc: |
|
teams.append({"url": url, "token": f"xoxc-...{xoxc[-6:]}"}) |
|
|
|
result = { |
|
"keychain_service": KEYCHAIN_SERVICE, |
|
"cookie_d": f"xoxd-...{xoxd[-6:]}" if xoxd else None, |
|
"teams": teams, |
|
} |
|
print(json.dumps(result, indent=2)) |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="Extract Slack Desktop credentials to Keychain") |
|
parser.add_argument("--check", action="store_true", help="Verify stored credentials") |
|
parser.add_argument("--show", action="store_true", help="Show stored credentials (redacted)") |
|
args = parser.parse_args() |
|
|
|
if args.check: |
|
sys.exit(0 if check_credentials() else 1) |
|
elif args.show: |
|
show_credentials() |
|
else: |
|
sys.exit(0 if extract_all() else 1) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |