Created
August 7, 2022 02:38
-
-
Save WitherOrNot/5e19aa2f583bf107a3f530025ff7be30 to your computer and use it in GitHub Desktop.
archive mc marketplace. needs entitytoken in token.txt and device id in script, can be gotten from running fiddler on mcbedrock w/ owner acct logged in
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from base64 import b64decode, b64encode | |
from binascii import hexlify | |
from requests import post, get | |
from json import loads, dumps | |
from Crypto.Cipher import AES | |
from os import makedirs | |
from os.path import join, exists, dirname | |
from zipfile import ZipFile | |
from io import BytesIO | |
DEVICE_ID = "<device id>" | |
def tobytes(s): | |
if type(s) == str: | |
return s.encode("utf-8") | |
else: | |
return s | |
def tostr(s): | |
if type(s) == bytes: | |
return s.decode("utf-8") | |
else: | |
return str(s) | |
def readstr(f): | |
s = "" | |
c = f.read(1) | |
while c[0] != 0: | |
s += chr(c[0]) | |
c = f.read(1) | |
return s | |
def b64bdec(s): | |
return b64decode(s + "=" * ((4 - len(s) % 4) % 4) ) | |
def b64sdec(s): | |
return tostr(b64bdec(s)) | |
def pf_req(endpoint, **kwargs): | |
return post(f"https://20ca2.playfabapi.com{endpoint}", headers={ | |
"x-entitytoken": ENT_TOKEN, | |
"Accept-Language": "en-US", | |
"Content-Type": "application/json", | |
"User-Agent": "cpprestsdk/2.9.0", | |
"x-playfabsdk": "XPlatCppSdk-3.6.190304", | |
"x-reporterrorassuccess": "true" | |
}, **kwargs).json()["data"] | |
def xor_bytes(s1, s2): | |
return bytes(map(lambda t: t[0] ^ t[1], list(zip(tobytes(s1), tobytes(s2)))[:min(len(s1), len(s2))])) | |
def fix_filename(fn): | |
for c in '<>:"/\|?*': | |
fn = fn.replace(c, "_") | |
return fn | |
def mkdir(path): | |
makedirs(path, exist_ok=True) | |
def aes_decrypt(f, key): | |
aes = AES.new(key, AES.MODE_CFB, key[:16]) | |
pos = f.tell() | |
size = f.seek(0, 2) | |
f.seek(pos) | |
ptext = b"" | |
while f.tell() < size: | |
ptext += aes.decrypt(f.read(0x400)) | |
return ptext | |
def aes_decrypt_tofile(ef, df, key): | |
aes = AES.new(key, AES.MODE_CFB, key[:16]) | |
pos = ef.tell() | |
size = ef.seek(0, 2) | |
ef.seek(pos) | |
while ef.tell() < size: | |
df.write(aes.decrypt(ef.read(0x400))) | |
def fcopy(sf, cf): | |
pos = sf.tell() | |
size = sf.seek(0, 2) | |
sf.seek(pos) | |
while sf.tell() < size: | |
cf.write(sf.read(0x4000)) | |
if __name__ == "__main__": | |
with open("token.txt", "r") as f: | |
ENT_TOKEN = f.read() | |
ENT_TOKEN = pf_req("/Authentication/GetEntityToken")["EntityToken"] | |
with open("token.txt", "w") as f: | |
f.write(ENT_TOKEN) | |
inventory = pf_req("/inventory/GetInventoryItems", json={"ReceiptData":{"DeviceId": DEVICE_ID}}) | |
keys_data = loads(b64sdec(inventory["Receipt"].split(".")[1])) | |
keys_db = {} | |
print("Parsing keys...") | |
for item in keys_data["Receipt"]["Entitlements"]: | |
if item['FriendlyId'] and "ContentKey" in item: | |
user_key = xor_bytes(keys_data["Receipt"]["EntityId"], DEVICE_ID) | |
content_key = b64bdec(item["ContentKey"]) | |
key = xor_bytes(user_key, content_key[::2]) | |
print(f"Key for {item['FriendlyId']} is {tostr(key)}") | |
keys_db[item['FriendlyId']] = key | |
for frid in keys_db: | |
result = pf_req("/Catalog/Search", json={"filter": f"(contentType eq 'MarketplaceDurableCatalog_V1.2') and tags/any(t : t eq '{frid}')"}) | |
try: | |
store_item = result["Items"][0] | |
title = store_item['Title']['en-US'] | |
store_id = store_item['Id'] | |
except: | |
continue | |
print(f"Getting {', '.join(i['type'] for i in store_item['DisplayProperties']['packIdentity'])} in {title}, ID {store_id}") | |
pub_item = pf_req("/Catalog/GetPublishedItem", json={"ItemId": store_item['Id'], "ETag": ""})["Item"] | |
dl_folder = join("downloads", fix_filename(title)) | |
pack_files = [] | |
mkdir(dl_folder) | |
for cont in pub_item["Contents"]: | |
if exists(join(dl_folder, f"{cont['Type']}")): | |
continue | |
print(f"Extracting {cont['Type']}...") | |
dl_file = BytesIO(get(cont["Url"], headers={"User-Agent": "libhttpclient/1.0.0.0"}).content) | |
with ZipFile(dl_file) as zf: | |
for pf in zf.namelist(): | |
zf.extract(pf, path=join(dl_folder, f"{cont['Type']}")) | |
pack_files.append(join(dl_folder, f"{cont['Type']}", pf)) | |
mkdir("packs") | |
for pf in pack_files: | |
with ZipFile(pf) as zf: | |
zfiles = zf.namelist() | |
wtype = "rsrc" | |
if "level.dat" in zfiles: | |
wtype = "world" | |
elif "skins.json" in zfiles: | |
wtype = "skins" | |
final_pack = join("packs", f"{fix_filename(title)}.{wtype}.zip") | |
if exists(final_pack): | |
continue | |
decrypted_files = [] | |
with ZipFile(final_pack, "w") as zof: | |
print(f"Loading {wtype} pack...") | |
ctjson = filter(lambda n: n.endswith("contents.json"), zfiles) | |
lvldb = filter(lambda n: n.startswith("db/"), zfiles) | |
for contf in ctjson: | |
with zf.open(contf) as cf: | |
cf.seek(0x11) | |
key_id = readstr(cf) | |
key = keys_db[key_id] | |
cf.seek(0x100) | |
print(f"Loading contents file {contf}") | |
contents = loads(tostr(aes_decrypt(cf, key))) | |
with zof.open(contf, "w") as dcf: | |
dcf.write(tobytes(dumps(contents))) | |
decrypted_files.append(contf) | |
for ftd in contents["content"]: | |
if "key" in ftd: | |
zpath = f"{dirname(contf)}/{ftd['path']}" | |
if zpath[0] == "/": | |
zpath = zpath[1:] | |
ckey = tobytes(ftd["key"]) | |
print(f"Decrypting {zpath}") | |
with zf.open(zpath) as ef: | |
with zof.open(zpath, "w") as df: | |
if zpath == "level.dat": | |
print("Removing prid from encrypted level.dat") | |
lvldat = aes_decrypt(ef, ckey) | |
lvldat = lvldat.replace(b"prid", b"lmao", 1) | |
df.write(lvldat) | |
elif zpath.endswith("skins.json"): | |
print("Removing paid attrs from encrypted skins.json") | |
skjson = aes_decrypt(ef, ckey) | |
skjson = skjson.replace(b"paid", b"free") | |
df.write(skjson) | |
else: | |
aes_decrypt_tofile(ef, df, ckey) | |
decrypted_files.append(zpath) | |
for ldb in lvldb: | |
try: | |
with zf.open(ldb) as lf: | |
lf.seek(0x11) | |
key_id = readstr(lf) | |
key = keys_db[key_id] | |
lf.seek(0x100) | |
print(f"Decrypting {ldb}") | |
with zof.open(ldb, "w") as dlf: | |
aes_decrypt_tofile(lf, dlf, key) | |
decrypted_files.append(ldb) | |
except: | |
pass | |
for uef in zfiles: | |
if uef not in decrypted_files: | |
with zf.open(uef) as sf: | |
with zof.open(uef, "w") as cf: | |
if uef == "level.dat": | |
print("Removing prid from unencrypted level.dat") | |
lvldat = sf.read() | |
lvldat = lvldat.replace(b"prid", b"lmao", 1) | |
cf.write(lvldat) | |
elif uef.endswith("skins.json"): | |
print("Removing paid attrs from unencrypted skins.json") | |
skjson = sf.read() | |
skjson = skjson.replace(b"paid", b"free") | |
cf.write(skjson) | |
else: | |
print(f"Copying unencrypted file {uef}") | |
fcopy(sf, cf) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment