Last active
February 2, 2024 04:20
-
-
Save JJTech0130/30940c45fb7b1de5c309dc4eb2874586 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from rich import print | |
from rich.progress import track, Progress | |
from rich.prompt import Prompt | |
from rich.console import Console | |
from rich.table import Table | |
import zipfile | |
import os | |
import threading | |
import io | |
import lzma | |
import json | |
APPLEDB_API = "https://api.appledb.dev/" | |
APPLEDB_RAW = "https://raw.githubusercontent.com/littlebyteorg/appledb/main/" | |
CACHE_PATH = "cache/" | |
def download_fast(url: str) -> bytes: | |
if not os.path.exists(CACHE_PATH): | |
os.makedirs(CACHE_PATH) | |
cache_file = CACHE_PATH + url.split("/")[-1] | |
if os.path.exists(cache_file): | |
with open(cache_file, "rb") as f: | |
return f.read() | |
else: | |
# Create progress bar | |
progress = Progress() | |
chunk_size = 1024 * 1024 * 100 | |
total_size = int(requests.head(url).headers["Content-Length"]) | |
number_of_chunks = total_size // chunk_size + 1 | |
task = progress.add_task("Downloading", total=number_of_chunks) | |
def download(url,start): | |
if start + chunk_size > total_size: | |
c = total_size - start - 1 | |
else: | |
c = chunk_size - 1 | |
#print(f"Downloading {start}-{start+c} of {total_size}") | |
req = requests.get(url, headers={'Range': f'bytes={start}-{start+c}'}, stream=True) | |
#req = urllib2.Request('http://www.python.org/') | |
#req.headers['Range'] = 'bytes=%s-%s' % (start, start+chunk_size) | |
##parts[start] = f.read() | |
parts[start] = req.content | |
#print(f"Finished downloading {start}-{start+c} of {total_size}") | |
progress.advance(task) | |
threads = [] | |
parts = {} | |
# Initialize threads | |
for i in range(0,number_of_chunks): | |
t = threading.Thread(target=download, args=(url,i*chunk_size)) | |
t.start() | |
threads.append(t) | |
progress.start() | |
# Join threads back (order doesn't matter, you just want them all) | |
for i in threads: | |
i.join() | |
progress.stop() | |
#print(parts) | |
print(sorted(parts.keys())) | |
# Sort parts and you're done | |
result = b''.join(parts[i] for i in sorted(parts.keys())) | |
with open(cache_file, "wb") as f: | |
f.write(result) | |
return result | |
def main(): | |
console = Console() | |
# Ask for desired device type | |
device = Prompt.ask(":mobile_phone: Device type", console=console, default="iPhone5,1") | |
# Make sure the device is a real device using the AppleDB API | |
device_info = requests.get(APPLEDB_API + f"device/{device}.json") | |
if device_info.status_code != 200: | |
print("Device not found.") | |
return | |
# Get giant build list | |
builds = requests.get(APPLEDB_API + "ios/iOS/main.json").json() | |
# Check if [device].json exists | |
if not os.path.exists(CACHE_PATH + f"{device}.json"): | |
ota_builds = [] | |
builds = [build for build in builds if build["osType"] == "iOS" and device in build["deviceMap"]] | |
for build in track(builds, description="Checking builds for OTA updates"): | |
# Use raw API to check for full OTA | |
build_raw_resp = requests.get(APPLEDB_RAW + f"osFiles/iOS/{build['build'][:2]}x%20-%20{build['version'].split('.')[0]}.x/{build['build']}.json") | |
if build_raw_resp.status_code != 200: | |
continue | |
build_raw = build_raw_resp.json() | |
if not "sources" in build_raw: | |
continue | |
ota_sources = [source for source in build_raw["sources"] if device in source["deviceMap"] and source["type"] == "ota" and "prerequisiteBuild" not in source] | |
if len(ota_sources) > 0: | |
link = [link for link in ota_sources[0]["links"] if link["active"] == True][0] | |
ota_builds.append((build["build"], build["version"], link["url"])) | |
with open(CACHE_PATH + f"{device}.json", "w") as f: | |
f.write(json.dumps(ota_builds)) | |
else: | |
with open(CACHE_PATH + f"{device}.json", "r") as f: | |
ota_builds = json.loads(f.read()) | |
print(f"Found {len(ota_builds)} full OTA updates for {device}") | |
table = Table(title="OTA Updates") | |
table.add_column("Build") | |
table.add_column("Version") | |
table.add_column("URL") | |
for build in ota_builds: | |
table.add_row(*build) | |
console.print(table) | |
# # Ask for desired build | |
build_name = Prompt.ask(":package: Build", console=console, choices=[build[0] for build in ota_builds], default=ota_builds[0][0], show_choices=False) | |
#build_name = "test" | |
build_url = [build for build in ota_builds if build[0] == build_name][0][2] | |
#build_url = "https://secure-appldnld.apple.com/ios10.2/031-94086-20161207-F7C9D786-BBFA-11E6-9985-663182FDB0CC/com_apple_MobileAsset_SoftwareUpdate/b78bd2674b358c518f8b0703df9047c88ff7218a.zip" | |
build_file = download_fast(build_url) | |
z = zipfile.ZipFile(io.BytesIO(build_file)) | |
if "AssetData/payloadv2/payload" in z.namelist(): | |
print("Found payloadv2") | |
print("Payload offset in ZIP: " + hex(z.getinfo("AssetData/payloadv2/payload").header_offset + len(z.getinfo("AssetData/payloadv2/payload").FileHeader()) + 4)) # Not sure what the 4 is for but it makes it line up | |
payload = z.read("AssetData/payloadv2/payload") | |
#print(payload[:10]) | |
payload = io.BytesIO(payload) | |
# Parse pbzx header | |
assert payload.read(4) == b"pbzx" | |
chunk_size = int.from_bytes(payload.read(8), "big") | |
#print(f"Chunk size: {hex(chunk_size)}") | |
chunks = [] | |
# Parse the chunks | |
while True: | |
offset = payload.tell() | |
decompressed_length = int.from_bytes(payload.read(8), "big") | |
if decompressed_length == 0: | |
break | |
length = int.from_bytes(payload.read(8), "big") | |
payload.read(length) # Skip the compressed data | |
chunks.append((offset, length)) | |
# Check if we have cached the decompressed archive | |
cache_file = CACHE_PATH + build_url.split("/")[-1] + ".decompressed" | |
if os.path.exists(cache_file): | |
with open(cache_file, "rb") as f: | |
archive = f.read() | |
else: | |
archive = b"" | |
for (offset, length) in track(chunks, description="Decompressing chunks"): | |
payload.seek(offset + 16) # Skip the header (16 bytes) | |
chunk = lzma.decompress(payload.read(length)) | |
archive += chunk | |
with open(cache_file, "wb") as f: | |
f.write(archive) | |
print(archive[:10]) | |
archive = io.BytesIO(archive) | |
# Parse the files | |
files = [] | |
while True: | |
archive.read(6) | |
length = int.from_bytes(archive.read(4), "big") | |
#print(f"File offset: {hex(offset)}") | |
#print(f"File length: {hex(length)}") | |
archive.read(12) | |
name_length = int.from_bytes(archive.read(2), "big") | |
archive.read(6) | |
name = archive.read(name_length).decode() | |
#print(f"File name: {name}") | |
offset = archive.tell() | |
archive.read(length) | |
files.append((name, offset, length)) | |
if archive.tell() == archive.getbuffer().nbytes: | |
break | |
# Ask if we should write available files to disk | |
from rich.prompt import Confirm | |
if Confirm.ask("Write list of files to disk?", default=False): | |
with open(f"{build_name}_files.txt", "w") as f: | |
for (name, offset, length) in files: | |
f.write(f"{name} @ {hex(offset)} : {hex(length)}\n") | |
# Ask for desired file | |
file_name = Prompt.ask(":open_file_folder: File", console=console, choices=[file[0] for file in files], default="System/Library/PrivateFrameworks/IDS.framework/identityservicesd.app/identityservicesd", show_choices=False) | |
file = [file for file in files if file[0] == file_name][0] | |
print(file) | |
# Calculate chunk number and offset | |
start_chunk = file[1] // chunk_size # Chunk number, file may start partway into chunk due to floor division | |
# Look up the offset for that chunk | |
chunk_offset = chunks[start_chunk][0] | |
# Calculate the offset into the chunk | |
file_offset = file[1] % chunk_size | |
# Check if the file is split across multiple chunks | |
if file_offset + file[2] > chunk_size: | |
print("File is split across multiple chunks") | |
print(f"Chunk number: {start_chunk}") | |
print(f"Chunk offset: {hex(chunk_offset + 16)}") # Remove the 16 byte header | |
print(f"Chunk length: {hex(chunks[start_chunk][1])}") # Note header is not removed, I think this overreads? | |
print(f"File offset: {hex(file_offset)}") | |
print(f"File length: {hex(file[2])}") | |
archive.seek(file[1]) | |
print(f"file sanity check: {archive.read(10)}") | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import lzma | |
import sys | |
import requests | |
session = requests.Session() | |
URL = "http://appldnld.apple.com/ios10.3.3/091-23111-20170719-B3106482-697B-11E7-BDE5-4C9500BA0AE3/com_apple_MobileAsset_SoftwareUpdate/9e1777da1d4a8d6917de9cfa115f253aab6098c7.zip" | |
PBZ_IN_ZIP_OFFSET = 0x5a408fa | |
PBZ_CHUNK_OFFSET = 0x3fe01da4 | |
PBZ_CHUNK_LENGTH = 0x35fa94 | |
FILE_IN_CHUNK_OFFSET = 0x15110f | |
FILE_LENGTH = 0x5c3940 | |
req_begin = PBZ_IN_ZIP_OFFSET + PBZ_CHUNK_OFFSET | |
req_end = req_begin + PBZ_CHUNK_LENGTH | |
resp = requests.get(URL, headers={ | |
"Range": f"bytes={req_begin}-{req_end-1}" | |
}, stream=True) | |
if resp.status_code != 206: | |
print("Request failed") | |
sys.exit(1) | |
# this reads the whole chunk into memory, | |
# but it's only 16MB so meh | |
raw_chunk = lzma.decompress(resp.content, lzma.FORMAT_XZ) | |
with open("identityservicesd", "wb") as fp: | |
fp.write(raw_chunk[FILE_IN_CHUNK_OFFSET : FILE_IN_CHUNK_OFFSET+FILE_LENGTH]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment