Created
October 14, 2024 19:26
-
-
Save PolyShifter/4e730aae0256beb882c909d643b46661 to your computer and use it in GitHub Desktop.
Download Quixel Megascans with the ability to dictate what resolution and file type you want to download. Also checks for corrupt zip files. Tested with Surfaces and 3d Assets.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zipfile | |
import os | |
import json | |
import concurrent.futures | |
import requests | |
import shutil | |
import time | |
######## INITIAL SETUP ######## | |
# Define the token, download path, and target category | |
TOKEN = "Your Token Here" | |
# Update with the correct path to the directory | |
# Example: "C:/Users/MyAccount/Documents/QuixelZips" | |
DOWNLOAD_PATH = "" | |
# Update with the correct path to the ms_asset_categories.json file, not the | |
# directory. | |
# Example: C:/Users/MyAccount/Documents/QuixelZips/ms_asset_categoies.json" | |
JSON_FILE_PATH = "./ms_asset_categories.json" | |
# Pick a folder to store the download cache at (a text file). Then create the | |
# text file cache.txt there. Paste it below. | |
# IE: "C:/Users/MyAccount/Documents/Quixel/cache.txt" | |
CACHE_FILE_PATH = "./cache.txt" | |
# Download from https://github.com/WAUthethird/quixel-megascans-scripts --> | |
# "complete_asset_metadata.tar.zst", then unzip it. | |
ASSET_METADATA_PATH = "./asset_metadata.json" | |
## Set target download category/categories. | |
#working: 3d asset, 3d plant, surface, brush, displacement, imperfection, decal | |
TARGET_CATEGORY = "3d assets" | |
# Create subdirectories based on the category | |
SAVE_IN_SUBDIRECTORIES = True | |
# Download HighPoly? | |
HIGHPOLY = False | |
# Donwload ztool (ZBrush) file? | |
ZTOOL = False | |
# Use to overwrite existing cached items. (Example if you want to downlaod a | |
# different size texture. Or if they messed up and you had to adjust script to | |
# try again.) | |
OVERWRITE = False | |
# Set a limit for the number of items you want to download. 0 means no limit | |
MAX_ITEM_COUNT = 0 | |
# Specify the image type and resolution for all textures, or for default. | |
# If a value is not specified, then it will pull from the default item. | |
# If the resolution specified does not exist for that item (surface, asset, etc) | |
# then it will default to the highest resolution available for that item. | |
# So if you specify "8192x8192" but the highest that exists is 2048x2048, then it | |
# will download 2048x2048 instead. Note that some textures are not square, and | |
# so the highest resolution available will still download. IE could be 2048x1024 | |
MIME_TEXTURE_TYPES = { | |
"default": {"mimeType": "image/jpeg", "resolution": "4096x4096"}, | |
# "albedo": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "ao": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "brush": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "bump": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "cavity": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "curvature": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "diffuse": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
"displacement": {"mimeType": "image/x-exr", "resolution": "8192x8192"}, | |
# "f": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "fuzz": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "gloss": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "mask": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "metalness": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
"normal": {"mimeType": "image/x-exr", "resolution": "4096x4096"}, | |
# "normalbump": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "normalobject": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "occlusion": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "opacity": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "roughness": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "specular": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "thickness": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "translucency": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "transmission": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
} | |
######## SETUP - FINISHED ######## | |
# Function to normalize category strings | |
def normalize_category(category): | |
return category.strip().lower().rstrip('s') | |
# Function to convert categories to a space-separated string | |
def categories_to_string(categories, separator=" "): | |
result = [] | |
if isinstance(categories, dict): | |
for key, value in categories.items(): | |
if isinstance(value, dict) and value: | |
subcategories = categories_to_string(value, separator) | |
if subcategories: | |
result.append(f"{key}{separator}{subcategories}") | |
else: | |
result.append(key) | |
elif isinstance(value, list): | |
for item in value: | |
result.append(normalize_category(item)) | |
else: | |
result.append(normalize_category(key)) | |
return separator.join(result) | |
def get_asset_download_id(asset_id): | |
url = "https://quixel.com/v1/downloads" | |
paramToPass = get_asset_payload(asset_id) | |
trys = 0 | |
paramLen = len(paramToPass["components"]) | |
while(trys < paramLen-1): | |
trys += 1 | |
download_url_response = requests.post( | |
url, | |
headers={"Authorization": "Bearer " + TOKEN}, | |
json=paramToPass | |
) | |
if download_url_response.status_code == 200: | |
print(f"Found Download URL for {asset_id}") | |
return download_url_response.json()["id"] | |
elif download_url_response.status_code == 401: | |
print(( | |
"Possible expired token. Please get a new one from " | |
"https://quixel.com/megascans/home/ and then update the script.\n" | |
"If you just ran the script and downloaded stuff prior to this, " | |
"just re-run the script and try again." | |
)) | |
os._exit(0) | |
else: | |
print(f"Failed to get asset download url for id: {asset_id}") | |
response = download_url_response.json() | |
if response["code"] == "INVALID_PAYLOAD": | |
if "type not found" in response["msg"]: | |
payload = json.loads(response["msg"].replace(" type not found", "")) | |
if payload: | |
for idx, v in enumerate(paramToPass["components"]): | |
if paramToPass["components"][idx]["type"] == payload["type"]: | |
paramToPass["components"].pop(idx) | |
print("Removed "+payload["type"]+" from payload. Trying again...") | |
break | |
else: | |
print("DEBUG_ERROR: " + str(response)) | |
return None | |
else: | |
print("DEBUG_ERROR: " + str(response)) | |
return None | |
def check_zip_file(file_path): | |
the_zip_file = zipfile.ZipFile(file_path) | |
result = the_zip_file.testzip() | |
if result is not None: | |
print("!! First bad file in zip: %s" % result) | |
def download_asset(download_id, download_directory, asset_id): | |
# full_name = asset_metadata["asset_metadata"][download_id] | |
os.makedirs(download_directory, exist_ok=True) | |
url = f"https://assetdownloads.quixel.com/download/{download_id}?preserveStructure=True&url=https%3A%2F%2Fquixel.com%2Fv1%2Fdownloads" | |
response = requests.get(url, stream=True) | |
if response.status_code == 400: | |
# Print the response to see what's causing the issue | |
print(f"Error 400: {response.text}") | |
attempt_count = 0 | |
delay = 3 | |
max_attempts = 5 | |
while attempt_count < max_attempts: | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
content_disposition = response.headers.get("Content-Disposition") | |
if content_disposition: | |
filename = content_disposition.split("filename=")[-1].strip('"') | |
else: | |
filename = download_id | |
asset = asset_metadata["asset_metadata"].get(asset_id) | |
if asset: | |
asset_name = asset["name"].strip().replace(" ", "_") | |
ext = os.path.splitext(filename)[-1] | |
base_path = os.path.dirname(filename) | |
filename = os.path.join(base_path, f"{asset_name}_{asset_id}{ext}") | |
file_path = os.path.join(download_directory, filename) | |
try: | |
print(f"Downloading file: {file_path} ###") | |
with requests.get(url, stream=True) as r: | |
with open(file_path, 'wb') as f: | |
shutil.copyfileobj(r.raw, f) | |
check_zip_file(file_path.replace('\\', '/')) | |
return True | |
except requests.exceptions.ChunkedEncodingError as e: | |
print(f"Error during download: {e}") | |
time.sleep(delay) | |
delay += 3 | |
attempt_count += 1 | |
except zipfile.BadZipFile: | |
print(( | |
"Bad zip file found, removing zip file and attempting to " | |
"redownload..." | |
)) | |
os.remove(file_path) | |
else: | |
print(( | |
f"ERROR: Failed to download asset {download_id}, status code: " | |
f"{response.status_code}\nResponse: {response.text}" | |
)) | |
return False | |
print(f"Exceeded maximum retry attempts for asset {download_id}") | |
return False | |
# Function to handle downloading for threading | |
def download_asset_with_id(asset): | |
asset_id = asset["asset_id"] | |
path = "" | |
if(SAVE_IN_SUBDIRECTORIES): | |
path = asset["path"].lower() | |
download_id = get_asset_download_id(asset_id) | |
if download_id: | |
return download_asset(download_id, os.path.join(DOWNLOAD_PATH, path), asset_id) | |
else: | |
print(f"No download id found for {asset_id}.") | |
return False | |
def get_asset_payload(asset_id): | |
asset = [asset for asset in asset_metadata["asset_metadata"].values() if asset["full_metadata"]["id"] == asset_id] | |
if len(asset) <=0: | |
print(f"Asset {asset_id} not found in asset_metadata.json!") | |
return False | |
asset = asset[0] | |
if "components" in asset["full_metadata"]: | |
texture_types = {} | |
for component in asset["full_metadata"]["components"]: | |
resolutions = [] | |
for uri in component["uris"]: | |
for resolution_dict in uri["resolutions"]: | |
resolution = resolution_dict['resolution'] | |
resolutions.append(resolution) | |
texture_types[component["type"]] = resolutions | |
# Surfaces | |
else: | |
texture_types = {} | |
for map in asset["full_metadata"]["maps"]: | |
texture = map["type"] | |
if texture not in texture_types: | |
texture_types[texture] = [] | |
texture_types[texture].append(map["resolution"]) | |
asset_components = [] | |
for texture, resolutions in texture_types.items(): | |
resolution_requested = MIME_TEXTURE_TYPES.get( | |
texture, | |
MIME_TEXTURE_TYPES["default"] | |
)["resolution"] | |
# Get highest AVAILABLE resolution for that texture | |
if resolution_requested not in resolutions: | |
highest_value = 0 | |
for res in resolutions: | |
w, h = res.split("x") | |
res_sum = int(w) + int(h) | |
if res_sum > highest_value: | |
highest_value = res_sum | |
resolution_requested = res | |
asset_components.append({ | |
"type": texture, | |
"mimeType": MIME_TEXTURE_TYPES.get(texture, MIME_TEXTURE_TYPES["default"])["mimeType"], | |
"resolution": resolution_requested, | |
}) | |
payload = {"asset": asset_id, | |
"config": {"highpoly": HIGHPOLY, | |
"lowerlod_meshes": True, | |
"lowerlod_normals": True, | |
"ztool": ZTOOL, | |
"brushes": True, | |
"meshMimeType": "application/x-fbx", | |
"albedo_lods": True}, | |
"components": asset_components} | |
return payload | |
if __name__ == "__main__": | |
with open(JSON_FILE_PATH, 'r') as f: | |
asset_categories_dict = json.load(f) | |
print(f"Trying to match against target category: {TARGET_CATEGORY}") | |
# Load cached assets | |
cached_assets = set() | |
if os.path.exists(CACHE_FILE_PATH): | |
with open(CACHE_FILE_PATH, "r") as cache_file: | |
cached_assets = set(cache_file.read().splitlines()) | |
# Normalize target category for matching | |
normalized_target_categories = [normalize_category(part) for part in TARGET_CATEGORY.split("/")] | |
matching_asset_ids = [] | |
# Check matches for each asset in the loaded categories | |
for asset_id, categories in asset_categories_dict.items(): | |
# Convert the categories to a single string for matching | |
categories_str = categories_to_string(categories) | |
categories_path = categories_to_string(categories, "/") | |
# Check if all parts of target_category exist in the categories string | |
matches = all(normalize_category(part) in categories_str.lower() for part in normalized_target_categories) | |
if matches and asset_id not in cached_assets: | |
matching_asset_ids.append({"asset_id": asset_id, "path": categories_path}) | |
elif matches and asset_id in cached_assets and OVERWRITE: | |
matching_asset_ids.append({"asset_id": asset_id, "path": categories_path}) | |
if not matching_asset_ids: | |
print("No new assets found for the target category.") | |
exit() | |
print(f"{len(matching_asset_ids)} assets found.") | |
# Allow user to decide how many to download at once. | |
if MAX_ITEM_COUNT: | |
item_count = min(len(matching_asset_ids), MAX_ITEM_COUNT) | |
else: | |
item_count = len(matching_asset_ids) | |
confirmation = input(( | |
f"Do you want to download {item_count} {TARGET_CATEGORY} assets? " | |
"(y/n): " | |
)).strip().lower() | |
if confirmation != "y": | |
print("Download canceled.") | |
exit() | |
try: | |
with open(ASSET_METADATA_PATH, "r", encoding="utf-8") as f: | |
asset_metadata = json.load(f) | |
except FileNotFoundError: | |
print(( | |
"Couldn't find asset_metadata.json in the directory you selected, " | |
f"{ASSET_METADATA_PATH}. Quitting." | |
)) | |
exit() | |
time_start = time.time() | |
with open(CACHE_FILE_PATH, "a+") as cache_file: | |
# Use threading for faster downloading | |
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: | |
download_count = item_count | |
futures = {executor.submit(download_asset_with_id, asset): asset for asset in matching_asset_ids[:item_count]} | |
for future in concurrent.futures.as_completed(futures): | |
asset = futures[future] | |
asset_id = asset["asset_id"] | |
try: | |
result = future.result() | |
if result: | |
download_count-=1 | |
print(f"{download_count} remaining items to download.") | |
# Add the asset to the cache file after successful download | |
cache_file.write(f"{asset_id}\n") | |
cache_file.flush() | |
except Exception as e: | |
print(f"Error downloading asset {asset_id}: {e}") | |
print(f"Time Taken: {time.time()-time_start} seconds") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Function to get the value of a specific cookie by name | |
function getCookie(name) { | |
const value = `; ${document.cookie}`; | |
const parts = value.split(`; ${name}=`); | |
if (parts.length === 2) return parts.pop().split(';').shift(); | |
} | |
// Get the auth cookie | |
const authCookie = getCookie('auth'); | |
// Parse the auth cookie (it should be a JSON string containing the token) | |
if (authCookie) { | |
try { | |
const authData = JSON.parse(decodeURIComponent(authCookie)); | |
const authToken = authData.token; | |
console.log("Auth Token:", authToken); | |
} catch (error) { | |
console.error("Error parsing auth cookie:", error); | |
} | |
} else { | |
console.error("Auth cookie not found. Please make sure you are logged in."); | |
} |
@ProtoNoob sadly no... I kinda gave up on getting those to work. And I also can't point you in the right direction as I can't seem to make sense of that error..
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@ArtFreex I am having issues with brushes as well. Did you find a solution?
I have some ability to modify the code so if you can point me in the right direction I can probably take care of it.