Created
November 16, 2022 01:55
-
-
Save malicious/cd2a17f1ace34d27a8c710892721b28d to your computer and use it in GitHub Desktop.
Re-encrypts an iOS backup that's been decrypted by mvt-ios.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Re-encrypts an iOS backup that's been decrypted by mvt-ios. | |
Depending on the iOS version of the backup, there may be files that were never | |
encrypted in the first place, which mvt-ios then skips over during decryption. | |
""" | |
import hashlib | |
import itertools | |
import json | |
import mmap | |
import os | |
import pathlib | |
import plistlib | |
import sqlite3 | |
import struct | |
import sys | |
from pprint import pprint | |
from typing import Dict, List, Optional | |
import NSKeyedUnArchiver | |
import keyring | |
from Crypto.Cipher import AES | |
def _aes_unwrap(kek, wrapped_key): | |
def pack_64b(s): | |
return struct.pack(">Q", s) | |
def unpack_64b(s): | |
return struct.unpack(">Q", s)[0] | |
C = [] | |
for i in range(len(wrapped_key) // 8): | |
C.append(unpack_64b(wrapped_key[i * 8:i * 8 + 8])) | |
n = len(C) - 1 | |
R = [0] * (n + 1) | |
A = C[0] | |
for i in range(1, n + 1): | |
R[i] = C[i] | |
for j in reversed(range(0, 6)): | |
for i in reversed(range(1, n + 1)): | |
todec = pack_64b(A ^ (n * j + i)) | |
todec += pack_64b(R[i]) | |
B = AES.new(kek, AES.MODE_ECB).decrypt(todec) | |
A = unpack_64b(B[:8]) | |
R[i] = unpack_64b(B[8:]) | |
if A != 0xa6a6a6a6a6a6a6a6: | |
# print(f"AES decryption integrity check failed, key IV: {A}") | |
return None | |
return b"".join(map(pack_64b, R[1:])) | |
# Check the SHA1 of a given file to confirm it's what we expected | |
def _recompute_file_hash(filepath): | |
buffer_size = 128 * 1024 | |
sha1 = hashlib.sha1() | |
with open(filepath, 'rb') as f: | |
while True: | |
data = f.read(buffer_size) | |
if not data: | |
break | |
sha1.update(data) | |
return sha1.digest() | |
class PasscodeCacher: | |
""" | |
Passcodes are stored as a dict that maps from passcode_str to passcode_keys (bytearray). | |
- "None" is permitted as a dictionary key, for if we don't have the original passcode_str | |
- A list of passcode_keys is stored as the value, for times when the encryption algorithm changes | |
- More-recent passcode_key entries are appended to the end of the list. Try not to depend on this. | |
""" | |
KEYRING_IDENTIFIER: str = "iOS backup passcodes / re-encrypt.py" | |
def __init__(self): | |
self.passcodes: Dict[str, List] = {} | |
def load_from_keyring(self): | |
encoded_passcodes = keyring.get_password(PasscodeCacher.KEYRING_IDENTIFIER, None) | |
if encoded_passcodes: | |
stored_passcodes = json.loads(encoded_passcodes) | |
self.passcodes.update(stored_passcodes) | |
def save_to_keyring(self): | |
encoded_passcodes = json.dumps(self.passcodes) | |
keyring.set_password(PasscodeCacher.KEYRING_IDENTIFIER, None, encoded_passcodes) | |
def has(self, key): | |
return key in self.passcodes | |
def add_one(self, key, value): | |
if key not in self.passcodes: | |
self.passcodes[key] = [] | |
passcode_keys = self.passcodes[key] | |
passcode_keys.append(value.hex()) | |
# Remove duplicate passcode_keys | |
self.passcodes[key] = list(set(passcode_keys)) | |
def get_one(self, item): | |
return bytes.fromhex(self.passcodes[item][-1]) | |
def values(self) -> List: | |
return map(bytes.fromhex, itertools.chain.from_iterable(self.passcodes.values())) | |
class ManifestPlist: | |
# This is used to store the unwrapped keys in a "fake" keybag entry, because | |
# the keybag is already a per-protection class kind of data structure. | |
# | |
# Not a great practice, but every other piece of iOS code does it. | |
# | |
UNWRAPPED_KEY_ENTRY = b"unlocked-WPKY" | |
def __init__(self, backup_dir): | |
self.mpl_path = os.path.join(backup_dir, "Manifest.plist") | |
self.passcode_cacher = PasscodeCacher() | |
self.passcode_cacher.load_from_keyring() | |
def expects_encrypted_backup(self): | |
with open(self.mpl_path, 'rb') as f: | |
manifest = plistlib.load(f) | |
expects_encrypted = manifest['IsEncrypted'] | |
print(f"[INFO] Manifest.plist reports IsEncrypted={expects_encrypted}") | |
return expects_encrypted | |
@staticmethod | |
def _loop_tlv_blocks(blob): | |
i = 0 | |
while i + 8 <= len(blob): | |
# First 4 bytes are the identifier for this block | |
tag = blob[i:i + 4] | |
# Next 4 bytes are the length (most significant byte first) | |
# (length of 4 means the total block length is 0xc bytes, but this is handled by the caller) | |
reported_length = struct.unpack(">L", blob[i + 4:i + 8])[0] | |
data = blob[i + 8:i + 8 + reported_length] | |
if len(data) == 4: | |
data = struct.unpack(">L", data)[0] | |
yield (tag, data,) | |
# Iterate to the next block | |
i += 8 + reported_length | |
def parse_keybag(self): | |
with open(self.mpl_path, 'rb') as infile: | |
manifest = plistlib.load(infile) | |
keybag_attrs = {} | |
keybag_uuid = None | |
keybag_wrap = None | |
parsed_keybag_keys = {} | |
current_key = None | |
def _close_key_class(current_key_dict): | |
if not current_key_dict: | |
return | |
key_class = current_key_dict[b"CLAS"] | |
current_key_dict[b"CLAS"] = key_class | |
parsed_keybag_keys[key_class] = current_key_dict | |
# tag/data are in a flat multidict, so parse them into a more-hierarchical structure | |
# | |
# - start of the multidict is usually VERS + TYPE, keep those in "keybag_attrs" | |
# - UUID usually represents the start of a new key | |
# - TODO: extract key info for use with hashcat | |
# | |
# TODO: Info in https://stackoverflow.com/questions/1498342/ is much more detailed. | |
# | |
backup_keybag = manifest['BackupKeyBag'] | |
for tag, data in ManifestPlist._loop_tlv_blocks(backup_keybag): | |
# print(f"{tag}: {data}") | |
if tag == b"TYPE": | |
assert not current_key | |
keybag_attrs[b"TYPE"] = data | |
elif tag == b"UUID" and keybag_uuid is None: | |
assert not current_key | |
keybag_uuid = data.hex() | |
elif tag == b"WRAP" and keybag_wrap is None: | |
assert not current_key | |
keybag_wrap = data | |
elif tag == b"UUID": | |
# Use UUID tag to denote the start of a new key | |
# …which means we wrap up parsing of the current key | |
_close_key_class(current_key) | |
current_key = {} | |
current_key[b"UUID"] = data | |
current_key[b"UUID"] = data.hex() | |
elif tag == b"CLAS": | |
current_key[b"CLAS"] = data | |
elif tag == b"KTYP": | |
current_key[b"KTYP"] = data | |
elif tag == b"WRAP": | |
current_key[b"WRAP"] = data | |
elif tag == b"WPKY": | |
current_key[b"WPKY"] = data | |
elif tag in [b"DPSL", b"HMCK", b"SALT"]: | |
keybag_attrs[tag] = data.hex() | |
else: | |
keybag_attrs[tag] = data | |
# Close out the in-progress key, if we started one | |
_close_key_class(current_key) | |
current_key = None | |
self.keybag_attrs = keybag_attrs | |
self.parsed_keybag_keys = parsed_keybag_keys | |
def _compute_passcode_key(self, passcode_str): | |
passcode = passcode_str.encode('utf-8') | |
passcode_key_step1 = hashlib.pbkdf2_hmac('sha256', | |
passcode, | |
bytes.fromhex(self.keybag_attrs[b"DPSL"]), | |
self.keybag_attrs[b"DPIC"], | |
32) | |
passcode_key_step2 = hashlib.pbkdf2_hmac( | |
'sha1', | |
passcode_key_step1, | |
bytes.fromhex(self.keybag_attrs[b"SALT"]), | |
self.keybag_attrs[b"ITER"], | |
32) | |
return passcode_key_step2 | |
def _try_unlock_keys(self, passcode_key): | |
# Names for the b"WRAP" field in BackupKeyBag | |
WRAP_DEVICE = 1 | |
WRAP_PASSCODE = 2 | |
successful_unwraps = 0 | |
for key_class, key_dict in self.parsed_keybag_keys.items(): | |
if b"WPKY" not in key_dict: | |
continue | |
assert key_dict[b"WRAP"] & WRAP_PASSCODE | |
unwrapped_key = _aes_unwrap(passcode_key, key_dict[b"WPKY"]) | |
if not unwrapped_key: | |
if b"DPSL" not in self.keybag_attrs: | |
print(f"[WARN] \"DPSL\" not found in keybag, is the backup unencrypted?") | |
if b"DPIC" not in self.keybag_attrs: | |
print(f"[WARN] \"DPIC\" not found in keybag, is the backup unencrypted?") | |
# TODO: Should we stop trying to unwrap as soon as a decryption fails? | |
continue | |
key_dict[ManifestPlist.UNWRAPPED_KEY_ENTRY] = unwrapped_key | |
successful_unwraps += 1 | |
return successful_unwraps | |
def unlock_keys(self, passcode_str: Optional[str]) -> Dict: | |
# Try existing keys first, since it's unlikely we really need a new one | |
passcode_keys_to_try = self.passcode_cacher.values() | |
for passcode_key in passcode_keys_to_try: | |
successful_unwraps = self._try_unlock_keys(passcode_key) | |
if successful_unwraps < 1: | |
continue | |
if passcode_str: | |
# Only store the passcodes and keys after we've successfully used them. | |
self.passcode_cacher.add_one(passcode_str, passcode_key) | |
self.passcode_cacher.save_to_keyring() | |
else: | |
print(f"[INFO] Successfully unwrapped with existing passcode_key: {passcode_key.hex()}") | |
# Succeeded here, exit | |
return self.parsed_keybag_keys | |
# If those didn't work, see if we maybe need to compute a new one | |
# This also handles the case where the algorithm changed (e.g. new SALT in the keybag) | |
if passcode_str: | |
passcode_key = self._compute_passcode_key(passcode_str) | |
successful_unwraps = self._try_unlock_keys(passcode_key) | |
if successful_unwraps > 0: | |
self.passcode_cacher.add_one(passcode_str, passcode_key) | |
self.passcode_cacher.save_to_keyring() | |
# TODO: Also try recomputing new passcode_keys based on old passcode_str's + new parameters | |
# Went through all known passcodes, couldn't figure anything out | |
raise ValueError("Failed to decrypt keybag, try a different passcode") | |
def _mp_encrypt(mdb, row, column_names, output_dir): | |
row_data = dict(zip(column_names, row)) | |
# print(f"[DEBUG] Starting encrypt: {row_data['fileID']}") | |
return mdb._try_encrypt_file(row_data, output_dir) | |
class ManifestDb: | |
def __init__(self, backup_dir): | |
self.backup_dir = backup_dir | |
self.mdb_path = os.path.join(backup_dir, "Manifest.db") | |
self.mdb_conn = sqlite3.connect(self.mdb_path) | |
if not ManifestDb.is_manifest_db_decrypted(self.mdb_path): | |
raise ValueError(f"Couldn't open file: {self.mdb_path}") | |
# Used later; call self.set_protection_class_key() once ManifestPlist is unlocked. | |
self.unlocked_keybag_keys = {} | |
@staticmethod | |
def is_manifest_db_decrypted(mdb_path): | |
""" | |
Check that the given directory is a partially-decrypted backup. | |
Partially-decrypted means: | |
- filesystem files are in plaintext (file contents are actually readable with e.g. `file`) | |
- Manifest.db contents have not been updated (because decrypting means new backup keys) | |
- (optional) Manifest.plist still reports IsEncrypted, since it was copied directly from an encrypted dir | |
""" | |
# First check: is Manifest.db encrypted? | |
conn = sqlite3.connect(mdb_path) | |
cur = conn.cursor() | |
try: | |
cur.execute("SELECT fileID FROM Files LIMIT 1") | |
except sqlite3.DatabaseError: | |
print(f"[WARN] Can't access Manifest.db contents, treating as encrypted: {mdb_path}") | |
return False | |
cur.close() | |
conn.close() | |
return True | |
def dump_files_table(self, row_limit=None): | |
"""Pretty print the file contents, mostly useful for git diff of an iOS backup""" | |
cur = self.mdb_conn.cursor() | |
query_str = "SELECT * FROM Files ORDER BY fileID" | |
if row_limit: | |
query_str += f" LIMIT {row_limit}" | |
cur.execute(query_str) | |
column_names = [col[0] for col in cur.description] | |
for row in cur: | |
row_data = dict(zip(column_names, row)) | |
if row_data['file']: | |
# prep to pretty-print the whole field | |
row_data['file'] = plistlib.loads(row_data['file']) | |
# prep what seems to be a file hash | |
objects_dict = row_data['file']['$objects'] | |
if objects_dict[1]['$class'] == plistlib.UID(6): | |
if isinstance(objects_dict[3], (bytes,)): | |
objects_dict[3] = objects_dict[3].hex() | |
if objects_dict[1]['$class'] == plistlib.UID(4): | |
if isinstance(objects_dict[3], (bytes,)): | |
objects_dict[3] = objects_dict[3].hex() | |
pprint(row_data) | |
cur.close() | |
def set_protection_class_key(self, protection_class_id, key): | |
self.unlocked_keybag_keys[protection_class_id] = key | |
def unwrap(self, protection_class_id, wrapped_key): | |
if len(wrapped_key) != 0x28: | |
raise ValueError(f"[ERROR] Invalid key length: {len(wrapped_key)}") | |
return _aes_unwrap(self.unlocked_keybag_keys[protection_class_id], wrapped_key) | |
@staticmethod | |
def _just_encrypt_file(source_f, target_f, reencryptor, target_size, needs_padding=True): | |
# If source_f is an empty file, we actually still want to pad it with data. | |
if target_size == 0: | |
mmapped_source = [] | |
# Mmap the decrypted file, for speed | |
elif os.name == 'nt': | |
mmapped_source = mmap.mmap(source_f.fileno(), length=0, access=mmap.ACCESS_READ) | |
else: | |
mmapped_source = mmap.mmap(source_f.fileno(), length=0, prot=mmap.PROT_READ) | |
# Write the reencrypted file to disk | |
current_chunk_index = 0 | |
chunk_size = 16 * 1000 * 1000 | |
while True: # current_chunk_index * chunk_size < target_size: | |
# print(f"[DEBUG] Encrypting file from byte {current_chunk_index * chunk_size}") | |
current_chunk = mmapped_source[ | |
current_chunk_index * chunk_size:(current_chunk_index + 1) * chunk_size] | |
if len(current_chunk) == 0: | |
break | |
if len(current_chunk) % 16: | |
padding_size = 16 - (len(current_chunk) % 16) | |
current_chunk = current_chunk + padding_size.to_bytes(1, sys.byteorder) * padding_size | |
needs_padding = False | |
target_f.write(reencryptor.encrypt(current_chunk)) | |
current_chunk_index += 1 | |
if needs_padding: | |
padding_size = 16 - (len(current_chunk) % 16) | |
current_chunk = padding_size.to_bytes(1, sys.byteorder) * padding_size | |
target_f.write(reencryptor.encrypt(current_chunk)) | |
def _try_encrypt_file(self, row_data, output_dir): | |
try: | |
file_manifest = NSKeyedUnArchiver.unserializeNSKeyedArchiver(row_data['file']) | |
except TypeError: | |
if not len(row_data['file']): | |
return | |
print(f"[ERROR] Couldn't parse Manifest.db data for {row_data['fileID']}") | |
return | |
# Confirm that we have a source file to decrypt | |
source_path = os.path.join(self.backup_dir, row_data['fileID'][0:2], row_data['fileID']) | |
if not os.path.exists(source_path): | |
return | |
# And that we have a target file to encrypt to | |
target_path = os.path.join(output_dir, row_data['fileID'][0:2], row_data['fileID']) | |
if 'Digest' in file_manifest and os.path.exists(target_path): | |
# Check if the output file was already created + identical | |
computed_hash = _recompute_file_hash(target_path) | |
recorded_hash = file_manifest['Digest'] | |
if computed_hash == recorded_hash: | |
print(f"[DEBUG] Skipping encryption, target file already matches: {row_data['fileID']}, {file_manifest['Size']} bytes") | |
return | |
else: | |
print(f"[WARN] File already exists, but hash doesn't match: {row_data['fileID']}") | |
target_path_parent = os.path.dirname(target_path) | |
if not os.path.exists(target_path_parent): | |
pathlib.Path(target_path_parent).mkdir(parents=False, exist_ok=True) | |
# And collect the encryption key to decrypt with | |
file_encryption_key = file_manifest['EncryptionKey'][4:] | |
with open(source_path, 'rb') as source_f: | |
with open(target_path, 'wb') as target_f: | |
key = self.unwrap(file_manifest['ProtectionClass'], file_encryption_key) | |
reencryptor = AES.new(key, AES.MODE_CBC, b'\x00' * 16) | |
# print(f"[DEBUG] Encrypting fileID {row_data['fileID']} / {file_manifest['RelativePath']}, {file_manifest['Size']} bytes") | |
ManifestDb._just_encrypt_file(source_f, target_f, reencryptor, file_manifest['Size']) | |
# No hash to compute, or care about | |
if 'Digest' not in file_manifest: | |
return | |
computed_hash = _recompute_file_hash(target_path) | |
recorded_hash = file_manifest['Digest'] | |
if computed_hash != recorded_hash: | |
row_data['file'] = file_manifest | |
print() | |
print( | |
f"[ERROR] Failed to re-encrypt fileID \"{row_data['fileID']}\":\n" | |
f" checksum of original file: {_recompute_file_hash(source_path).hex()}\n" | |
f" checksum of encrypted output: {computed_hash.hex()}\n" | |
f" checksum recorded in Manifest.db: {recorded_hash.hex()}") | |
print( | |
f"[DEBUG] Check if file sizes match for \"{file_manifest['RelativePath']}\":\n" | |
f" size of original, decrypted file: {os.path.getsize(source_path)}\n" | |
f" size of encrypted output file: {os.path.getsize(target_path)}\n" | |
f" size recorded in Manifest.db: {file_manifest['Size']}") | |
pprint(row_data) | |
raise ValueError(f"SHA1 checksums don't match, failed to re-encrypt") | |
def reencrypt_one(self, file_id, output_dir): | |
cur = self.mdb_conn.cursor() | |
cur.execute("SELECT * FROM Files WHERE fileID = ?", (file_id,)) | |
column_names = [col[0] for col in cur.description] | |
for row in cur: | |
row_data = dict(zip(column_names, row)) | |
self._try_encrypt_file(row_data, output_dir) | |
cur.close() | |
def reencrypt_all(self, output_dir): | |
cur = self.mdb_conn.cursor() | |
cur.execute("SELECT * FROM Files ORDER BY FileID") | |
column_names = [col[0] for col in cur.description] | |
failed_reencryptions = 0 | |
succeeded_reencryptions = 0 | |
for row in cur: | |
row_data = dict(zip(column_names, row)) | |
try: | |
# print(f"[DEBUG] Trying to encrypt file: {row_data['fileID']}") | |
self._try_encrypt_file(row_data, output_dir) | |
succeeded_reencryptions += 1 | |
except (ValueError, NotImplementedError,) as e: | |
failed_reencryptions += 1 | |
print(f"[ERROR] {row_data['fileID']}: {e}") | |
if failed_reencryptions > 0: | |
print(f"[INFO] Failed to encrypt {failed_reencryptions} files (total {failed_reencryptions + succeeded_reencryptions})") | |
cur.close() | |
def reencrypt_threaded(self, output_dir, num_threads=3): | |
cur = self.mdb_conn.cursor() | |
# NB Hide the SQLite connection while we're doing multiprocessing. | |
# If we want to use the class again, set it after the futures have finished running. | |
self.mdb_conn = None | |
cur.execute("SELECT * FROM Files ORDER BY FileID") | |
column_names = [col[0] for col in cur.description] | |
import concurrent.futures | |
executor = concurrent.futures.ProcessPoolExecutor() | |
futures_list = [] | |
for row in cur: | |
future = executor.submit(_mp_encrypt, self, row, column_names, output_dir) | |
futures_list.append(future) | |
for future in concurrent.futures.as_completed(futures_list): | |
if future.exception(): | |
print(future.exception()) | |
cur.close() | |
if __name__ == '__main__': | |
backup_dir = sys.argv[1] | |
output_dir = sys.argv[2] | |
passcode_str = None | |
if len(sys.argv) >= 4: | |
passcode_str = sys.argv[3] | |
print_debug_info = False | |
mdb = ManifestDb(backup_dir) | |
if print_debug_info: | |
print('=' * 72) | |
print('Dumping Manifest.db info') | |
print('=' * 72) | |
print() | |
mdb.dump_files_table(row_limit=2) | |
print('[WARN] truncating entries past limit') | |
print() | |
mpl = ManifestPlist(backup_dir) | |
mpl.parse_keybag() | |
if print_debug_info: | |
print('=' * 72) | |
print('Dumping Manifest.plist info') | |
print('=' * 72) | |
print() | |
pprint(mpl.keybag_attrs) | |
pprint(mpl.parsed_keybag_keys) | |
print() | |
unlocked_keys = mpl.unlock_keys(passcode_str) | |
for key_class, key_dict in unlocked_keys.items(): | |
mdb.set_protection_class_key(key_class, key_dict[ManifestPlist.UNWRAPPED_KEY_ENTRY]) | |
print('=' * 72) | |
print(f'Re-encrypting files into {output_dir}') | |
print('=' * 72) | |
print() | |
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) | |
# mdb.reencrypt_all(output_dir) | |
mdb.reencrypt_threaded(output_dir) | |
print() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment