Created
March 21, 2023 17:16
-
-
Save mnixry/316a3d9c7d46120677d26f4d3715b778 to your computer and use it in GitHub Desktop.
Decrypt all TEXT and BLOB fields in MobileQQ/TIM database and update records in-place.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import itertools | |
| import sqlite3 | |
| import sys | |
| import rich | |
| import rich.progress | |
| def decrypt_blob(data: bytes, byte_key: bytes) -> bytes: | |
| key_iterator = itertools.cycle(byte_key) | |
| return bytes(byte ^ next(key_iterator) for byte in data) | |
| def decrypt_text(data: str, byte_key: bytes) -> str: | |
| key_iterator = itertools.cycle(byte_key) | |
| decrypted_data = "".join( | |
| chr( | |
| char | |
| ^ ( | |
| next(key_iterator) | |
| if char <= 0xFFFF | |
| else ((next(key_iterator) << 10) + next(key_iterator)) | |
| ) | |
| ) | |
| for char in map(ord, data) | |
| ) | |
| return decrypted_data | |
| def all_tables_info(conn: sqlite3.Connection): | |
| table_results = conn.execute("SELECT name FROM sqlite_master WHERE type='table'") | |
| suspicious_columns: dict[str, list[str]] = {} | |
| for result in table_results: | |
| table_name, *_ = result | |
| table_columns = conn.execute(f"PRAGMA table_info({table_name})") | |
| for column in table_columns: | |
| _, column_name, column_type, *_ = column | |
| if column_type in {"BLOB", "TEXT"}: | |
| suspicious_columns.setdefault(table_name, []).append(column_name) | |
| return suspicious_columns | |
| def update_table( | |
| progress: rich.progress.Progress, | |
| conn: sqlite3.Connection, | |
| decryption_key: bytes, | |
| table_name: str, | |
| column_names: list[str], | |
| ): | |
| column_names_str = ", ".join(column_names) | |
| select_query = f"SELECT rowid, {column_names_str} FROM {table_name}" | |
| update_query = ( | |
| f"UPDATE {table_name} SET " | |
| + ", ".join(f"{column_name} = ?" for column_name in column_names) | |
| + " WHERE rowid = ?" | |
| ) | |
| total_records, *_ = conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone() | |
| if total_records <= 0: | |
| return | |
| task = progress.add_task("Updating", total=total_records) | |
| rich.print(f"[bold green]Updating[/bold green] [yellow]{table_name}[/yellow]") | |
| for row in conn.execute(select_query): | |
| rowid, *columns = row | |
| for index, column in enumerate(columns): | |
| if column is None: | |
| continue | |
| elif isinstance(column, bytes): | |
| columns[index] = decrypt_blob(column, decryption_key) | |
| elif isinstance(column, str): | |
| columns[index] = decrypt_text(column, decryption_key) | |
| try: | |
| conn.execute(update_query, (*columns, rowid)) | |
| except (sqlite3.OperationalError, sqlite3.IntegrityError): | |
| progress.log( | |
| f"[bold red]Error[/bold red] updating {table_name} {rowid=}, " | |
| f"{update_query=} {columns=}" | |
| ) | |
| progress.update(task, advance=1) | |
| progress.remove_task(task) | |
| if __name__ == "__main__": | |
| assert ( | |
| len(sys.argv) == 3 | |
| ), "Usage: python decrypt_chatbase.py <encrypted_file> <imei>" | |
| _, encrypted_file, key = sys.argv | |
| connection = sqlite3.connect(encrypted_file) | |
| rich.print("[bold green]Connected to database[/bold green]") | |
| with rich.progress.Progress() as progress: | |
| tables_info = all_tables_info(connection) | |
| for table_name, columns in tables_info.items(): | |
| update_table(progress, connection, key.encode(), table_name, columns) | |
| connection.commit() | |
| connection.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment