Last active
March 1, 2024 18:37
-
-
Save Terrance/4d9df2d550806fb038d18956fbe3b268 to your computer and use it in GitHub Desktop.
Methods to decrypt and re-encrypt database backups for Conversations, an Android XMPP client.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from getpass import getpass | |
import gzip | |
import hashlib | |
import io | |
import json | |
import logging | |
import pathlib | |
import struct | |
import sqlite3 | |
from typing import Any, BinaryIO, Optional | |
from cryptography.exceptions import InvalidTag | |
from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
LOG = logging.getLogger(__name__) | |
SCHEMA = """ | |
CREATE TABLE accounts ( | |
uuid TEXT PRIMARY KEY, | |
username TEXT, | |
server TEXT, | |
password TEXT, | |
display_name TEXT, | |
status TEXT, | |
status_message TEXT, | |
rosterversion TEXT, | |
options NUMBER, | |
avatar TEXT, | |
keys TEXT, | |
hostname TEXT, | |
resource TEXT, | |
port NUMBER, | |
pinned_mechanism TEXT, | |
pinned_channel_binding TEXT, | |
fast_mechanism TEXT, | |
fast_token TEXT | |
); | |
CREATE TABLE conversations ( | |
uuid TEXT PRIMARY KEY, | |
name TEXT, | |
contactUuid TEXT, | |
accountUuid TEXT, | |
contactJid TEXT, | |
created NUMBER, | |
status NUMBER, | |
mode NUMBER, | |
attributes TEXT, | |
FOREIGN KEY (accountUuid) REFERENCES accounts (uuid) ON DELETE CASCADE | |
); | |
CREATE TABLE messages ( | |
uuid TEXT PRIMARY KEY, | |
conversationUuid TEXT, | |
timeSent NUMBER, | |
counterpart TEXT, | |
trueCounterpart TEXT, | |
body TEXT, | |
encryption NUMBER, | |
status NUMBER,type NUMBER, | |
relativeFilePath TEXT, | |
serverMsgId TEXT, | |
axolotl_fingerprint TEXT, | |
carbon INTEGER, | |
edited TEXT, | |
read NUMBER, | |
oob INTEGER, | |
errorMsg TEXT, | |
readByMarkers TEXT, | |
markable NUMBER, | |
deleted NUMBER, | |
bodyLanguage TEXT, | |
remoteMsgId TEXT, | |
FOREIGN KEY (conversationUuid) REFERENCES conversations (uuid) ON DELETE CASCADE | |
); | |
CREATE TABLE sessions ( | |
account TEXT, | |
name TEXT, | |
device_id INTEGER, | |
key TEXT, | |
FOREIGN KEY (account) REFERENCES accounts (uuid) ON DELETE CASCADE, | |
UNIQUE (account, name, device_id) ON CONFLICT REPLACE | |
); | |
CREATE TABLE prekeys ( | |
account TEXT, | |
id INTEGER, | |
key TEXT, | |
FOREIGN KEY (account) REFERENCES accounts (uuid) ON DELETE CASCADE, | |
UNIQUE (account, id) ON CONFLICT REPLACE | |
); | |
CREATE TABLE signed_prekeys ( | |
account TEXT, | |
id INTEGER, | |
key TEXT, | |
FOREIGN KEY (account) REFERENCES accounts (uuid) ON DELETE CASCADE, | |
UNIQUE (account, id) ON CONFLICT REPLACE | |
); | |
CREATE TABLE identities ( | |
account TEXT, | |
name TEXT, | |
ownkey INTEGER, | |
fingerprint TEXT, | |
certificate BLOB, | |
trust TEXT, | |
active NUMBER, | |
last_activation NUMBER, | |
key TEXT, | |
FOREIGN KEY (account) REFERENCES accounts(uuid) ON DELETE CASCADE, | |
UNIQUE (account, name, fingerprint) ON CONFLICT IGNORE | |
); | |
""" | |
class StreamReader: | |
def __init__(self, stream: BinaryIO): | |
self.stream = stream | |
def _read(self, fmt: str): | |
return struct.unpack(fmt, self.stream.read(struct.calcsize(fmt)))[0] | |
def short(self) -> int: | |
return self._read(">h") | |
def long(self) -> int: | |
return self._read(">q") | |
def int(self) -> int: | |
return self._read(">i") | |
def utf(self) -> bytes: | |
len = self.short() | |
return self.stream.read(len) | |
class StreamWriter: | |
def __init__(self, stream: io.BufferedWriter): | |
self.stream = stream | |
def _write(self, fmt: str, value: Any) -> None: | |
self.stream.write(struct.pack(fmt, value)) | |
def short(self, value: int) -> None: | |
self._write(">h", value) | |
def long(self, value: int) -> None: | |
self._write(">q", value) | |
def int(self, value: int) -> None: | |
self._write(">i", value) | |
def utf(self, value: bytes) -> None: | |
self.short(len(value)) | |
self.stream.write(value) | |
def params(stream: BinaryIO): | |
reader = StreamReader(stream) | |
ver = reader.int() | |
assert ver in (1, 2), "Unexpected backup version {}".format(ver) | |
app = reader.utf() | |
jid = reader.utf() | |
ts = reader.long() | |
LOG.debug("Backup header: app=%r, jid=%r, ts=%r", app, jid, ts) | |
nonce = stream.read(12) | |
salt = stream.read(16) | |
return (ver, app, jid, ts, nonce, salt) | |
def cipher(password: str, salt: bytes): | |
key = hashlib.pbkdf2_hmac("sha1", password.encode(), salt, 1024, 128 // 8) | |
return AESGCM(key) | |
def decrypt(backup_file: pathlib.Path, database_file: pathlib.Path, password: Optional[str] = None): | |
with open(backup_file, "rb") as stream: | |
ver, _, _, _, nonce, salt = params(stream) | |
if not password: | |
password = getpass("Enter decryption password: ") | |
gcm = cipher(password, salt) | |
try: | |
zipped = gcm.decrypt(nonce, stream.read(), None) | |
except InvalidTag: | |
raise RuntimeError("Failed to decrypt backup with the given password") | |
data = gzip.decompress(zipped).decode() | |
LOG.debug("Decrypted %r bytes", len(data)) | |
conn = sqlite3.connect(str(database_file)) | |
conn.executescript(SCHEMA) | |
if ver == 1: | |
conn.executescript(data) | |
LOG.debug("Applied SQL to database file") | |
elif ver == 2: | |
tables = defaultdict(list) | |
for row in json.loads(data): | |
tables[row["table"]].append(row["values"]) | |
for table, rows in tables.items(): | |
for row in rows: | |
names = ", ".join(row) | |
placeholders = ":" + ", :".join(row) | |
conn.execute("INSERT INTO {} ({}) VALUES ({})".format(table, names, placeholders), row) | |
LOG.debug("Inserted %d row(s) to table %r", len(rows), table) | |
conn.commit() | |
conn.close() | |
def encrypt(database_file: pathlib.Path, old_backup_file: pathlib.Path, backup_file: pathlib.Path, password: Optional[str] = None): | |
with open(old_backup_file, "rb") as stream: | |
_, app, jid, ts, nonce, salt = params(stream) | |
db = sqlite3.connect(str(database_file)) | |
sql = "\n".join(query for query in db.iterdump() if not query.startswith("CREATE TABLE ")) | |
zipped = gzip.compress(sql.encode()) | |
with open(backup_file, "wb") as stream: | |
writer = StreamWriter(stream) | |
writer.int(1) | |
writer.utf(app) | |
writer.utf(jid) | |
writer.long(ts) | |
stream.write(nonce) | |
stream.write(salt) | |
if not password: | |
password = getpass("Enter encryption password: ") | |
gcm = cipher(password, salt) | |
data = gcm.encrypt(nonce, zipped, None) | |
stream.write(data) | |
LOG.debug("Encrypted %r bytes", len(data)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment