Skip to content

Instantly share code, notes, and snippets.

@mnixry
Created March 21, 2023 17:16
Show Gist options
  • Select an option

  • Save mnixry/316a3d9c7d46120677d26f4d3715b778 to your computer and use it in GitHub Desktop.

Select an option

Save mnixry/316a3d9c7d46120677d26f4d3715b778 to your computer and use it in GitHub Desktop.
Decrypt all TEXT and BLOB fields in MobileQQ/TIM database and update records in-place.
import itertools
import sqlite3
import sys
import rich
import rich.progress
def decrypt_blob(data: bytes, byte_key: bytes) -> bytes:
key_iterator = itertools.cycle(byte_key)
return bytes(byte ^ next(key_iterator) for byte in data)
def decrypt_text(data: str, byte_key: bytes) -> str:
key_iterator = itertools.cycle(byte_key)
decrypted_data = "".join(
chr(
char
^ (
next(key_iterator)
if char <= 0xFFFF
else ((next(key_iterator) << 10) + next(key_iterator))
)
)
for char in map(ord, data)
)
return decrypted_data
def all_tables_info(conn: sqlite3.Connection):
table_results = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
suspicious_columns: dict[str, list[str]] = {}
for result in table_results:
table_name, *_ = result
table_columns = conn.execute(f"PRAGMA table_info({table_name})")
for column in table_columns:
_, column_name, column_type, *_ = column
if column_type in {"BLOB", "TEXT"}:
suspicious_columns.setdefault(table_name, []).append(column_name)
return suspicious_columns
def update_table(
progress: rich.progress.Progress,
conn: sqlite3.Connection,
decryption_key: bytes,
table_name: str,
column_names: list[str],
):
column_names_str = ", ".join(column_names)
select_query = f"SELECT rowid, {column_names_str} FROM {table_name}"
update_query = (
f"UPDATE {table_name} SET "
+ ", ".join(f"{column_name} = ?" for column_name in column_names)
+ " WHERE rowid = ?"
)
total_records, *_ = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()
if total_records <= 0:
return
task = progress.add_task("Updating", total=total_records)
rich.print(f"[bold green]Updating[/bold green] [yellow]{table_name}[/yellow]")
for row in conn.execute(select_query):
rowid, *columns = row
for index, column in enumerate(columns):
if column is None:
continue
elif isinstance(column, bytes):
columns[index] = decrypt_blob(column, decryption_key)
elif isinstance(column, str):
columns[index] = decrypt_text(column, decryption_key)
try:
conn.execute(update_query, (*columns, rowid))
except (sqlite3.OperationalError, sqlite3.IntegrityError):
progress.log(
f"[bold red]Error[/bold red] updating {table_name} {rowid=}, "
f"{update_query=} {columns=}"
)
progress.update(task, advance=1)
progress.remove_task(task)
if __name__ == "__main__":
assert (
len(sys.argv) == 3
), "Usage: python decrypt_chatbase.py <encrypted_file> <imei>"
_, encrypted_file, key = sys.argv
connection = sqlite3.connect(encrypted_file)
rich.print("[bold green]Connected to database[/bold green]")
with rich.progress.Progress() as progress:
tables_info = all_tables_info(connection)
for table_name, columns in tables_info.items():
update_table(progress, connection, key.encode(), table_name, columns)
connection.commit()
connection.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment