Last active
April 14, 2026 22:13
-
-
Save almet/5bbf4f2ec6d1160f70f4c43897d7fa37 to your computer and use it in GitHub Desktop.
Export signal group media (gnome keyring only)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # /// script | |
| # dependencies = [ | |
| # "cryptography", | |
| # ] | |
| # /// | |
| """Export media from a Signal Desktop group conversation to a folder.""" | |
| import base64 | |
| import binascii | |
| import hashlib | |
| import hmac | |
| import json | |
| import mimetypes | |
| import platform | |
| import subprocess | |
| import sys | |
| from datetime import datetime | |
| from pathlib import Path | |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
| # Platform-specific Signal directory locations | |
| if platform.system() == "Darwin": # macOS | |
| SIGNAL_DIR = Path.home() / "Library" / "Application Support" / "Signal" | |
| else: # Linux | |
| SIGNAL_DIR = Path.home() / ".config" / "Signal" | |
| DB_PATH = SIGNAL_DIR / "sql" / "db.sqlite" | |
| ATTACHMENTS_DIR = SIGNAL_DIR / "attachments.noindex" | |
| CONFIG_PATH = SIGNAL_DIR / "config.json" | |
| def get_db_key(): | |
| """Decrypt the Signal DB key from config.json + platform keyring.""" | |
| config = json.loads(CONFIG_PATH.read_text()) | |
| encrypted_hex = config["encryptedKey"] | |
| encrypted_raw = binascii.unhexlify(encrypted_hex) | |
| if platform.system() == "Darwin": # macOS | |
| # Use macOS Keychain via security command | |
| try: | |
| result = subprocess.run( | |
| ["security", "find-generic-password", "-s", "Signal Safe Storage", "-w"], | |
| capture_output=True, | |
| text=True, | |
| check=True | |
| ) | |
| password = result.stdout.strip().encode() | |
| except subprocess.CalledProcessError: | |
| sys.exit("Could not find Signal keyring entry in macOS Keychain") | |
| # Derive decryption key (Chromium os_crypt on macOS) | |
| derived = hashlib.pbkdf2_hmac("sha1", password, b"saltysalt", 1003, dklen=16) | |
| # Decrypt: v10 prefix (3 bytes) + AES-128-CBC ciphertext, IV = 16 spaces | |
| if encrypted_raw[:3] != b"v10": | |
| sys.exit(f"Unexpected encryption version: {encrypted_raw[:3]}") | |
| else: # Linux | |
| try: | |
| import secretstorage | |
| except ImportError: | |
| sys.exit("secretstorage is required on Linux. Install it with: pip install secretstorage") | |
| # Get the Signal-specific password from GNOME keyring | |
| conn = secretstorage.dbus_init() | |
| collection = secretstorage.get_default_collection(conn) | |
| password = None | |
| for item in collection.get_all_items(): | |
| attrs = item.get_attributes() | |
| if ( | |
| attrs.get("xdg:schema") == "chrome_libsecret_os_crypt_password_v2" | |
| and attrs.get("application") == "Signal" | |
| ): | |
| password = item.get_secret() | |
| break | |
| if password is None: | |
| sys.exit("Could not find Signal keyring entry") | |
| # Derive decryption key (Chromium os_crypt on Linux) | |
| derived = hashlib.pbkdf2_hmac("sha1", password, b"saltysalt", 1, dklen=16) | |
| # v11 prefix for Linux | |
| if encrypted_raw[:3] != b"v11": | |
| sys.exit(f"Unexpected encryption version: {encrypted_raw[:3]}") | |
| # Decrypt: skip version prefix (3 bytes) + AES-128-CBC ciphertext, IV = 16 spaces | |
| ciphertext = encrypted_raw[3:] | |
| cipher = Cipher(algorithms.AES(derived), modes.CBC(b" " * 16)) | |
| dec = cipher.decryptor() | |
| plaintext = dec.update(ciphertext) + dec.finalize() | |
| # Strip PKCS7 padding | |
| plaintext = plaintext[: -plaintext[-1]] | |
| return plaintext.decode() | |
| def query_db(db_key, sql): | |
| """Run a SQL query via sqlcipher CLI and return parsed JSON rows.""" | |
| full_sql = ( | |
| f"PRAGMA key = \"x'{db_key}'\";\n" | |
| f"PRAGMA cipher_compatibility = 4;\n" | |
| f".mode json\n" | |
| f"{sql}\n" | |
| ) | |
| result = subprocess.run( | |
| ["sqlcipher", str(DB_PATH)], | |
| input=full_sql, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| # Filter out the "ok" line from PRAGMA and parse JSON | |
| lines = [l for l in result.stdout.strip().splitlines() if l != "ok"] | |
| if not lines: | |
| return [] | |
| return json.loads("\n".join(lines)) | |
| def list_groups(db_key): | |
| """Return list of group conversations, most recent first.""" | |
| return query_db( | |
| db_key, | |
| "SELECT id, name FROM conversations WHERE type = 'group' AND active_at IS NOT NULL ORDER BY active_at DESC;", | |
| ) | |
| def get_attachments(db_key, conversation_id): | |
| """Get all downloadable attachments for a conversation.""" | |
| return query_db( | |
| db_key, | |
| f"""SELECT path, contentType, fileName, size, localKey, sentAt | |
| FROM message_attachments | |
| WHERE conversationId = '{conversation_id}' | |
| AND path IS NOT NULL | |
| AND localKey IS NOT NULL | |
| AND attachmentType = 'attachment' | |
| ORDER BY sentAt ASC;""", | |
| ) | |
| def decrypt_attachment(path, local_key): | |
| """Decrypt a Signal attachment file. Returns the plaintext bytes.""" | |
| file_path = ATTACHMENTS_DIR / path | |
| key_bytes = base64.b64decode(local_key) | |
| aes_key = key_bytes[:32] | |
| hmac_key = key_bytes[32:] | |
| data = file_path.read_bytes() | |
| iv = data[:16] | |
| hmac_sig = data[-32:] | |
| ciphertext = data[16:-32] | |
| # Verify HMAC | |
| expected = hmac.new(hmac_key, iv + ciphertext, hashlib.sha256).digest() | |
| if not hmac.compare_digest(expected, hmac_sig): | |
| raise ValueError(f"HMAC verification failed for {path}") | |
| # Decrypt AES-256-CBC | |
| cipher = Cipher(algorithms.AES(aes_key), modes.CBC(iv)) | |
| dec = cipher.decryptor() | |
| plaintext = dec.update(ciphertext) + dec.finalize() | |
| # Strip PKCS7 padding | |
| pad = plaintext[-1] | |
| if 1 <= pad <= 16: | |
| plaintext = plaintext[:-pad] | |
| return plaintext | |
| def extension_for(content_type, filename): | |
| """Get a file extension from content type or filename.""" | |
| if filename: | |
| ext = Path(filename).suffix | |
| if ext: | |
| return ext | |
| ext = mimetypes.guess_extension(content_type or "") | |
| if ext: | |
| # mimetypes returns .jpe for image/jpeg sometimes | |
| if ext == ".jpe": | |
| return ".jpg" | |
| return ext | |
| return "" | |
| def pick_group(groups): | |
| """Interactive group selection.""" | |
| print("\nSignal groups (most recent first):\n") | |
| for i, g in enumerate(groups, 1): | |
| print(f" {i:3d}. {g['name']}") | |
| print() | |
| while True: | |
| try: | |
| choice = input("Select a group [number]: ").strip() | |
| idx = int(choice) - 1 | |
| if 0 <= idx < len(groups): | |
| return groups[idx] | |
| except (ValueError, EOFError): | |
| pass | |
| print("Invalid choice, try again.") | |
| def pick_output_dir(): | |
| """Ask for output directory.""" | |
| while True: | |
| path = input("Export to folder: ").strip() | |
| if not path: | |
| continue | |
| p = Path(path).expanduser().resolve() | |
| p.mkdir(parents=True, exist_ok=True) | |
| return p | |
| def main(): | |
| print("Reading Signal database key...") | |
| db_key = get_db_key() | |
| print("Loading groups...") | |
| groups = list_groups(db_key) | |
| if not groups: | |
| sys.exit("No groups found.") | |
| group = pick_group(groups) | |
| print(f"\nSelected: {group['name']}") | |
| output_dir = pick_output_dir() | |
| print(f"Exporting to: {output_dir}\n") | |
| attachments = get_attachments(db_key, group["id"]) | |
| if not attachments: | |
| print("No media found in this group.") | |
| return | |
| print(f"Found {len(attachments)} attachments.\n") | |
| exported = 0 | |
| skipped = 0 | |
| errors = 0 | |
| for att in attachments: | |
| ext = extension_for(att["contentType"], att["fileName"]) | |
| # Convert sentAt timestamp (milliseconds) to date string | |
| sent_timestamp = att.get("sentAt", 0) | |
| if sent_timestamp: | |
| # Signal uses millisecond timestamps | |
| sent_date = datetime.fromtimestamp(sent_timestamp / 1000) | |
| date_prefix = sent_date.strftime("%Y-%m-%d_%H-%M-%S_") | |
| else: | |
| date_prefix = "unknown-date_" | |
| # Use original filename if available, otherwise hash-based name | |
| if att["fileName"]: | |
| name = date_prefix + att["fileName"] | |
| else: | |
| name = date_prefix + Path(att["path"]).name + ext | |
| dest = output_dir / name | |
| # Avoid overwriting: add counter suffix | |
| if dest.exists(): | |
| stem = dest.stem | |
| suffix = dest.suffix | |
| counter = 1 | |
| while dest.exists(): | |
| dest = output_dir / f"{stem}_{counter}{suffix}" | |
| counter += 1 | |
| try: | |
| plaintext = decrypt_attachment(att["path"], att["localKey"]) | |
| dest.write_bytes(plaintext) | |
| exported += 1 | |
| print(f" [{exported}/{len(attachments)}] {dest.name} ({att['contentType']}, {att['size']:,} bytes)") | |
| except FileNotFoundError: | |
| skipped += 1 | |
| except Exception as e: | |
| errors += 1 | |
| print(f" ERROR: {name}: {e}", file=sys.stderr) | |
| print(f"\nDone: {exported} exported, {skipped} missing, {errors} errors.") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment