Skip to content

Instantly share code, notes, and snippets.

@PolyShifter
Created October 14, 2024 19:26
Show Gist options
  • Save PolyShifter/4e730aae0256beb882c909d643b46661 to your computer and use it in GitHub Desktop.
Save PolyShifter/4e730aae0256beb882c909d643b46661 to your computer and use it in GitHub Desktop.
Download Quixel Megascans with the ability to dictate what resolution and file type you want to download. Also checks for corrupt zip files. Tested with Surfaces and 3d Assets.
import zipfile
import os
import json
import concurrent.futures
import requests
import shutil
import time
######## INITIAL SETUP ########
# Define the token, download path, and target category
TOKEN = "Your Token Here"
# Update with the correct path to the directory
# Example: "C:/Users/MyAccount/Documents/QuixelZips"
DOWNLOAD_PATH = ""
# Update with the correct path to the ms_asset_categories.json file, not the
# directory.
# Example: C:/Users/MyAccount/Documents/QuixelZips/ms_asset_categoies.json"
JSON_FILE_PATH = "./ms_asset_categories.json"
# Pick a folder to store the download cache at (a text file). Then create the
# text file cache.txt there. Paste it below.
# IE: "C:/Users/MyAccount/Documents/Quixel/cache.txt"
CACHE_FILE_PATH = "./cache.txt"
# Download from https://github.com/WAUthethird/quixel-megascans-scripts -->
# "complete_asset_metadata.tar.zst", then unzip it.
ASSET_METADATA_PATH = "./asset_metadata.json"
## Set target download category/categories.
#working: 3d asset, 3d plant, surface, brush, displacement, imperfection, decal
TARGET_CATEGORY = "3d assets"
# Create subdirectories based on the category
SAVE_IN_SUBDIRECTORIES = True
# Download HighPoly?
HIGHPOLY = False
# Donwload ztool (ZBrush) file?
ZTOOL = False
# Use to overwrite existing cached items. (Example if you want to downlaod a
# different size texture. Or if they messed up and you had to adjust script to
# try again.)
OVERWRITE = False
# Set a limit for the number of items you want to download. 0 means no limit
MAX_ITEM_COUNT = 0
# Specify the image type and resolution for all textures, or for default.
# If a value is not specified, then it will pull from the default item.
# If the resolution specified does not exist for that item (surface, asset, etc)
# then it will default to the highest resolution available for that item.
# So if you specify "8192x8192" but the highest that exists is 2048x2048, then it
# will download 2048x2048 instead. Note that some textures are not square, and
# so the highest resolution available will still download. IE could be 2048x1024
MIME_TEXTURE_TYPES = {
"default": {"mimeType": "image/jpeg", "resolution": "4096x4096"},
# "albedo": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "ao": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "brush": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "bump": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "cavity": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "curvature": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "diffuse": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
"displacement": {"mimeType": "image/x-exr", "resolution": "8192x8192"},
# "f": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "fuzz": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "gloss": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "mask": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "metalness": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
"normal": {"mimeType": "image/x-exr", "resolution": "4096x4096"},
# "normalbump": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "normalobject": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "occlusion": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "opacity": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "roughness": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "specular": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "thickness": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "translucency": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
# "transmission": {"mimeType": "image/jpeg", "resolution": "2048x2048"},
}
######## SETUP - FINISHED ########
# Function to normalize category strings
def normalize_category(category):
return category.strip().lower().rstrip('s')
# Function to convert categories to a space-separated string
def categories_to_string(categories, separator=" "):
result = []
if isinstance(categories, dict):
for key, value in categories.items():
if isinstance(value, dict) and value:
subcategories = categories_to_string(value, separator)
if subcategories:
result.append(f"{key}{separator}{subcategories}")
else:
result.append(key)
elif isinstance(value, list):
for item in value:
result.append(normalize_category(item))
else:
result.append(normalize_category(key))
return separator.join(result)
def get_asset_download_id(asset_id):
url = "https://quixel.com/v1/downloads"
paramToPass = get_asset_payload(asset_id)
trys = 0
paramLen = len(paramToPass["components"])
while(trys < paramLen-1):
trys += 1
download_url_response = requests.post(
url,
headers={"Authorization": "Bearer " + TOKEN},
json=paramToPass
)
if download_url_response.status_code == 200:
print(f"Found Download URL for {asset_id}")
return download_url_response.json()["id"]
elif download_url_response.status_code == 401:
print((
"Possible expired token. Please get a new one from "
"https://quixel.com/megascans/home/ and then update the script.\n"
"If you just ran the script and downloaded stuff prior to this, "
"just re-run the script and try again."
))
os._exit(0)
else:
print(f"Failed to get asset download url for id: {asset_id}")
response = download_url_response.json()
if response["code"] == "INVALID_PAYLOAD":
if "type not found" in response["msg"]:
payload = json.loads(response["msg"].replace(" type not found", ""))
if payload:
for idx, v in enumerate(paramToPass["components"]):
if paramToPass["components"][idx]["type"] == payload["type"]:
paramToPass["components"].pop(idx)
print("Removed "+payload["type"]+" from payload. Trying again...")
break
else:
print("DEBUG_ERROR: " + str(response))
return None
else:
print("DEBUG_ERROR: " + str(response))
return None
def check_zip_file(file_path):
the_zip_file = zipfile.ZipFile(file_path)
result = the_zip_file.testzip()
if result is not None:
print("!! First bad file in zip: %s" % result)
def download_asset(download_id, download_directory, asset_id):
# full_name = asset_metadata["asset_metadata"][download_id]
os.makedirs(download_directory, exist_ok=True)
url = f"https://assetdownloads.quixel.com/download/{download_id}?preserveStructure=True&url=https%3A%2F%2Fquixel.com%2Fv1%2Fdownloads"
response = requests.get(url, stream=True)
if response.status_code == 400:
# Print the response to see what's causing the issue
print(f"Error 400: {response.text}")
attempt_count = 0
delay = 3
max_attempts = 5
while attempt_count < max_attempts:
response = requests.get(url, stream=True)
if response.status_code == 200:
content_disposition = response.headers.get("Content-Disposition")
if content_disposition:
filename = content_disposition.split("filename=")[-1].strip('"')
else:
filename = download_id
asset = asset_metadata["asset_metadata"].get(asset_id)
if asset:
asset_name = asset["name"].strip().replace(" ", "_")
ext = os.path.splitext(filename)[-1]
base_path = os.path.dirname(filename)
filename = os.path.join(base_path, f"{asset_name}_{asset_id}{ext}")
file_path = os.path.join(download_directory, filename)
try:
print(f"Downloading file: {file_path} ###")
with requests.get(url, stream=True) as r:
with open(file_path, 'wb') as f:
shutil.copyfileobj(r.raw, f)
check_zip_file(file_path.replace('\\', '/'))
return True
except requests.exceptions.ChunkedEncodingError as e:
print(f"Error during download: {e}")
time.sleep(delay)
delay += 3
attempt_count += 1
except zipfile.BadZipFile:
print((
"Bad zip file found, removing zip file and attempting to "
"redownload..."
))
os.remove(file_path)
else:
print((
f"ERROR: Failed to download asset {download_id}, status code: "
f"{response.status_code}\nResponse: {response.text}"
))
return False
print(f"Exceeded maximum retry attempts for asset {download_id}")
return False
# Function to handle downloading for threading
def download_asset_with_id(asset):
asset_id = asset["asset_id"]
path = ""
if(SAVE_IN_SUBDIRECTORIES):
path = asset["path"].lower()
download_id = get_asset_download_id(asset_id)
if download_id:
return download_asset(download_id, os.path.join(DOWNLOAD_PATH, path), asset_id)
else:
print(f"No download id found for {asset_id}.")
return False
def get_asset_payload(asset_id):
asset = [asset for asset in asset_metadata["asset_metadata"].values() if asset["full_metadata"]["id"] == asset_id]
if len(asset) <=0:
print(f"Asset {asset_id} not found in asset_metadata.json!")
return False
asset = asset[0]
if "components" in asset["full_metadata"]:
texture_types = {}
for component in asset["full_metadata"]["components"]:
resolutions = []
for uri in component["uris"]:
for resolution_dict in uri["resolutions"]:
resolution = resolution_dict['resolution']
resolutions.append(resolution)
texture_types[component["type"]] = resolutions
# Surfaces
else:
texture_types = {}
for map in asset["full_metadata"]["maps"]:
texture = map["type"]
if texture not in texture_types:
texture_types[texture] = []
texture_types[texture].append(map["resolution"])
asset_components = []
for texture, resolutions in texture_types.items():
resolution_requested = MIME_TEXTURE_TYPES.get(
texture,
MIME_TEXTURE_TYPES["default"]
)["resolution"]
# Get highest AVAILABLE resolution for that texture
if resolution_requested not in resolutions:
highest_value = 0
for res in resolutions:
w, h = res.split("x")
res_sum = int(w) + int(h)
if res_sum > highest_value:
highest_value = res_sum
resolution_requested = res
asset_components.append({
"type": texture,
"mimeType": MIME_TEXTURE_TYPES.get(texture, MIME_TEXTURE_TYPES["default"])["mimeType"],
"resolution": resolution_requested,
})
payload = {"asset": asset_id,
"config": {"highpoly": HIGHPOLY,
"lowerlod_meshes": True,
"lowerlod_normals": True,
"ztool": ZTOOL,
"brushes": True,
"meshMimeType": "application/x-fbx",
"albedo_lods": True},
"components": asset_components}
return payload
if __name__ == "__main__":
with open(JSON_FILE_PATH, 'r') as f:
asset_categories_dict = json.load(f)
print(f"Trying to match against target category: {TARGET_CATEGORY}")
# Load cached assets
cached_assets = set()
if os.path.exists(CACHE_FILE_PATH):
with open(CACHE_FILE_PATH, "r") as cache_file:
cached_assets = set(cache_file.read().splitlines())
# Normalize target category for matching
normalized_target_categories = [normalize_category(part) for part in TARGET_CATEGORY.split("/")]
matching_asset_ids = []
# Check matches for each asset in the loaded categories
for asset_id, categories in asset_categories_dict.items():
# Convert the categories to a single string for matching
categories_str = categories_to_string(categories)
categories_path = categories_to_string(categories, "/")
# Check if all parts of target_category exist in the categories string
matches = all(normalize_category(part) in categories_str.lower() for part in normalized_target_categories)
if matches and asset_id not in cached_assets:
matching_asset_ids.append({"asset_id": asset_id, "path": categories_path})
elif matches and asset_id in cached_assets and OVERWRITE:
matching_asset_ids.append({"asset_id": asset_id, "path": categories_path})
if not matching_asset_ids:
print("No new assets found for the target category.")
exit()
print(f"{len(matching_asset_ids)} assets found.")
# Allow user to decide how many to download at once.
if MAX_ITEM_COUNT:
item_count = min(len(matching_asset_ids), MAX_ITEM_COUNT)
else:
item_count = len(matching_asset_ids)
confirmation = input((
f"Do you want to download {item_count} {TARGET_CATEGORY} assets? "
"(y/n): "
)).strip().lower()
if confirmation != "y":
print("Download canceled.")
exit()
try:
with open(ASSET_METADATA_PATH, "r", encoding="utf-8") as f:
asset_metadata = json.load(f)
except FileNotFoundError:
print((
"Couldn't find asset_metadata.json in the directory you selected, "
f"{ASSET_METADATA_PATH}. Quitting."
))
exit()
time_start = time.time()
with open(CACHE_FILE_PATH, "a+") as cache_file:
# Use threading for faster downloading
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
download_count = item_count
futures = {executor.submit(download_asset_with_id, asset): asset for asset in matching_asset_ids[:item_count]}
for future in concurrent.futures.as_completed(futures):
asset = futures[future]
asset_id = asset["asset_id"]
try:
result = future.result()
if result:
download_count-=1
print(f"{download_count} remaining items to download.")
# Add the asset to the cache file after successful download
cache_file.write(f"{asset_id}\n")
cache_file.flush()
except Exception as e:
print(f"Error downloading asset {asset_id}: {e}")
print(f"Time Taken: {time.time()-time_start} seconds")
// Function to get the value of a specific cookie by name
function getCookie(name) {
const value = `; ${document.cookie}`;
const parts = value.split(`; ${name}=`);
if (parts.length === 2) return parts.pop().split(';').shift();
}
// Get the auth cookie
const authCookie = getCookie('auth');
// Parse the auth cookie (it should be a JSON string containing the token)
if (authCookie) {
try {
const authData = JSON.parse(decodeURIComponent(authCookie));
const authToken = authData.token;
console.log("Auth Token:", authToken);
} catch (error) {
console.error("Error parsing auth cookie:", error);
}
} else {
console.error("Auth cookie not found. Please make sure you are logged in.");
}
@jecete
Copy link

jecete commented Dec 2, 2024

Hi, I think I have followed all the instructions correctly. But I am getting an error that reads,

ModuleNotFoundError: No module named 'requests'

Any idea what i might be doing wrong?

@ArtFreex
Copy link

ArtFreex commented Dec 5, 2024

Anyone able to download the "brush" category? I am getting a bunch of "No download id found for ______."
(Apart from a bunch of 502 errors as well as some "ACCESS DENIED" errors for multiple other things..)

@rnckp
Copy link

rnckp commented Dec 9, 2024

Hi, I think I have followed all the instructions correctly. But I am getting an error that reads,

ModuleNotFoundError: No module named 'requests'

Any idea what i might be doing wrong?

Have you tried pip install requests?

@ProtoNoob
Copy link

Hey @PolyShifter,

Using the script on Surfaces and going well. Downloading 2k versions and getting about 1000-2000 files per hour (getting faster as the night goes on... probably because of less traffic). I am getting a significant number of 'connection broken' errors though (about 1 out of 10). Any suggestions?

Also, are there any categories that will be problematic? Any workarounds?

@ProtoNoob
Copy link

@Jarritus

Did you ever get 3D assets working?

@ProtoNoob
Copy link

@ArtFreex I am having issues with brushes as well. Did you find a solution?

I have some ability to modify the code so if you can point me in the right direction I can probably take care of it.

@ArtFreex
Copy link

@ProtoNoob sadly no... I kinda gave up on getting those to work. And I also can't point you in the right direction as I can't seem to make sense of that error..

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment