Skip to content

Instantly share code, notes, and snippets.

@WitherOrNot
Created August 7, 2022 02:38
Show Gist options
  • Save WitherOrNot/5e19aa2f583bf107a3f530025ff7be30 to your computer and use it in GitHub Desktop.
Save WitherOrNot/5e19aa2f583bf107a3f530025ff7be30 to your computer and use it in GitHub Desktop.
archive mc marketplace. needs entitytoken in token.txt and device id in script, can be gotten from running fiddler on mcbedrock w/ owner acct logged in
from base64 import b64decode, b64encode
from binascii import hexlify
from requests import post, get
from json import loads, dumps
from Crypto.Cipher import AES
from os import makedirs
from os.path import join, exists, dirname
from zipfile import ZipFile
from io import BytesIO
DEVICE_ID = "<device id>"
def tobytes(s):
if type(s) == str:
return s.encode("utf-8")
else:
return s
def tostr(s):
if type(s) == bytes:
return s.decode("utf-8")
else:
return str(s)
def readstr(f):
s = ""
c = f.read(1)
while c[0] != 0:
s += chr(c[0])
c = f.read(1)
return s
def b64bdec(s):
return b64decode(s + "=" * ((4 - len(s) % 4) % 4) )
def b64sdec(s):
return tostr(b64bdec(s))
def pf_req(endpoint, **kwargs):
return post(f"https://20ca2.playfabapi.com{endpoint}", headers={
"x-entitytoken": ENT_TOKEN,
"Accept-Language": "en-US",
"Content-Type": "application/json",
"User-Agent": "cpprestsdk/2.9.0",
"x-playfabsdk": "XPlatCppSdk-3.6.190304",
"x-reporterrorassuccess": "true"
}, **kwargs).json()["data"]
def xor_bytes(s1, s2):
return bytes(map(lambda t: t[0] ^ t[1], list(zip(tobytes(s1), tobytes(s2)))[:min(len(s1), len(s2))]))
def fix_filename(fn):
for c in '<>:"/\|?*':
fn = fn.replace(c, "_")
return fn
def mkdir(path):
makedirs(path, exist_ok=True)
def aes_decrypt(f, key):
aes = AES.new(key, AES.MODE_CFB, key[:16])
pos = f.tell()
size = f.seek(0, 2)
f.seek(pos)
ptext = b""
while f.tell() < size:
ptext += aes.decrypt(f.read(0x400))
return ptext
def aes_decrypt_tofile(ef, df, key):
aes = AES.new(key, AES.MODE_CFB, key[:16])
pos = ef.tell()
size = ef.seek(0, 2)
ef.seek(pos)
while ef.tell() < size:
df.write(aes.decrypt(ef.read(0x400)))
def fcopy(sf, cf):
pos = sf.tell()
size = sf.seek(0, 2)
sf.seek(pos)
while sf.tell() < size:
cf.write(sf.read(0x4000))
if __name__ == "__main__":
with open("token.txt", "r") as f:
ENT_TOKEN = f.read()
ENT_TOKEN = pf_req("/Authentication/GetEntityToken")["EntityToken"]
with open("token.txt", "w") as f:
f.write(ENT_TOKEN)
inventory = pf_req("/inventory/GetInventoryItems", json={"ReceiptData":{"DeviceId": DEVICE_ID}})
keys_data = loads(b64sdec(inventory["Receipt"].split(".")[1]))
keys_db = {}
print("Parsing keys...")
for item in keys_data["Receipt"]["Entitlements"]:
if item['FriendlyId'] and "ContentKey" in item:
user_key = xor_bytes(keys_data["Receipt"]["EntityId"], DEVICE_ID)
content_key = b64bdec(item["ContentKey"])
key = xor_bytes(user_key, content_key[::2])
print(f"Key for {item['FriendlyId']} is {tostr(key)}")
keys_db[item['FriendlyId']] = key
for frid in keys_db:
result = pf_req("/Catalog/Search", json={"filter": f"(contentType eq 'MarketplaceDurableCatalog_V1.2') and tags/any(t : t eq '{frid}')"})
try:
store_item = result["Items"][0]
title = store_item['Title']['en-US']
store_id = store_item['Id']
except:
continue
print(f"Getting {', '.join(i['type'] for i in store_item['DisplayProperties']['packIdentity'])} in {title}, ID {store_id}")
pub_item = pf_req("/Catalog/GetPublishedItem", json={"ItemId": store_item['Id'], "ETag": ""})["Item"]
dl_folder = join("downloads", fix_filename(title))
pack_files = []
mkdir(dl_folder)
for cont in pub_item["Contents"]:
if exists(join(dl_folder, f"{cont['Type']}")):
continue
print(f"Extracting {cont['Type']}...")
dl_file = BytesIO(get(cont["Url"], headers={"User-Agent": "libhttpclient/1.0.0.0"}).content)
with ZipFile(dl_file) as zf:
for pf in zf.namelist():
zf.extract(pf, path=join(dl_folder, f"{cont['Type']}"))
pack_files.append(join(dl_folder, f"{cont['Type']}", pf))
mkdir("packs")
for pf in pack_files:
with ZipFile(pf) as zf:
zfiles = zf.namelist()
wtype = "rsrc"
if "level.dat" in zfiles:
wtype = "world"
elif "skins.json" in zfiles:
wtype = "skins"
final_pack = join("packs", f"{fix_filename(title)}.{wtype}.zip")
if exists(final_pack):
continue
decrypted_files = []
with ZipFile(final_pack, "w") as zof:
print(f"Loading {wtype} pack...")
ctjson = filter(lambda n: n.endswith("contents.json"), zfiles)
lvldb = filter(lambda n: n.startswith("db/"), zfiles)
for contf in ctjson:
with zf.open(contf) as cf:
cf.seek(0x11)
key_id = readstr(cf)
key = keys_db[key_id]
cf.seek(0x100)
print(f"Loading contents file {contf}")
contents = loads(tostr(aes_decrypt(cf, key)))
with zof.open(contf, "w") as dcf:
dcf.write(tobytes(dumps(contents)))
decrypted_files.append(contf)
for ftd in contents["content"]:
if "key" in ftd:
zpath = f"{dirname(contf)}/{ftd['path']}"
if zpath[0] == "/":
zpath = zpath[1:]
ckey = tobytes(ftd["key"])
print(f"Decrypting {zpath}")
with zf.open(zpath) as ef:
with zof.open(zpath, "w") as df:
if zpath == "level.dat":
print("Removing prid from encrypted level.dat")
lvldat = aes_decrypt(ef, ckey)
lvldat = lvldat.replace(b"prid", b"lmao", 1)
df.write(lvldat)
elif zpath.endswith("skins.json"):
print("Removing paid attrs from encrypted skins.json")
skjson = aes_decrypt(ef, ckey)
skjson = skjson.replace(b"paid", b"free")
df.write(skjson)
else:
aes_decrypt_tofile(ef, df, ckey)
decrypted_files.append(zpath)
for ldb in lvldb:
try:
with zf.open(ldb) as lf:
lf.seek(0x11)
key_id = readstr(lf)
key = keys_db[key_id]
lf.seek(0x100)
print(f"Decrypting {ldb}")
with zof.open(ldb, "w") as dlf:
aes_decrypt_tofile(lf, dlf, key)
decrypted_files.append(ldb)
except:
pass
for uef in zfiles:
if uef not in decrypted_files:
with zf.open(uef) as sf:
with zof.open(uef, "w") as cf:
if uef == "level.dat":
print("Removing prid from unencrypted level.dat")
lvldat = sf.read()
lvldat = lvldat.replace(b"prid", b"lmao", 1)
cf.write(lvldat)
elif uef.endswith("skins.json"):
print("Removing paid attrs from unencrypted skins.json")
skjson = sf.read()
skjson = skjson.replace(b"paid", b"free")
cf.write(skjson)
else:
print(f"Copying unencrypted file {uef}")
fcopy(sf, cf)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment