-
-
Save PolyShifter/4e730aae0256beb882c909d643b46661 to your computer and use it in GitHub Desktop.
import zipfile | |
import os | |
import json | |
import concurrent.futures | |
import requests | |
import shutil | |
import time | |
######## INITIAL SETUP ######## | |
# Define the token, download path, and target category | |
TOKEN = "Your Token Here" | |
# Update with the correct path to the directory | |
# Example: "C:/Users/MyAccount/Documents/QuixelZips" | |
DOWNLOAD_PATH = "" | |
# Update with the correct path to the ms_asset_categories.json file, not the | |
# directory. | |
# Example: C:/Users/MyAccount/Documents/QuixelZips/ms_asset_categoies.json" | |
JSON_FILE_PATH = "./ms_asset_categories.json" | |
# Pick a folder to store the download cache at (a text file). Then create the | |
# text file cache.txt there. Paste it below. | |
# IE: "C:/Users/MyAccount/Documents/Quixel/cache.txt" | |
CACHE_FILE_PATH = "./cache.txt" | |
# Download from https://github.com/WAUthethird/quixel-megascans-scripts --> | |
# "complete_asset_metadata.tar.zst", then unzip it. | |
ASSET_METADATA_PATH = "./asset_metadata.json" | |
## Set target download category/categories. | |
#working: 3d asset, 3d plant, surface, brush, displacement, imperfection, decal | |
TARGET_CATEGORY = "3d assets" | |
# Create subdirectories based on the category | |
SAVE_IN_SUBDIRECTORIES = True | |
# Download HighPoly? | |
HIGHPOLY = False | |
# Donwload ztool (ZBrush) file? | |
ZTOOL = False | |
# Use to overwrite existing cached items. (Example if you want to downlaod a | |
# different size texture. Or if they messed up and you had to adjust script to | |
# try again.) | |
OVERWRITE = False | |
# Set a limit for the number of items you want to download. 0 means no limit | |
MAX_ITEM_COUNT = 0 | |
# Specify the image type and resolution for all textures, or for default. | |
# If a value is not specified, then it will pull from the default item. | |
# If the resolution specified does not exist for that item (surface, asset, etc) | |
# then it will default to the highest resolution available for that item. | |
# So if you specify "8192x8192" but the highest that exists is 2048x2048, then it | |
# will download 2048x2048 instead. Note that some textures are not square, and | |
# so the highest resolution available will still download. IE could be 2048x1024 | |
MIME_TEXTURE_TYPES = { | |
"default": {"mimeType": "image/jpeg", "resolution": "4096x4096"}, | |
# "albedo": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "ao": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "brush": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "bump": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "cavity": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "curvature": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "diffuse": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
"displacement": {"mimeType": "image/x-exr", "resolution": "8192x8192"}, | |
# "f": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "fuzz": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "gloss": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "mask": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "metalness": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
"normal": {"mimeType": "image/x-exr", "resolution": "4096x4096"}, | |
# "normalbump": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "normalobject": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "occlusion": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "opacity": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "roughness": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "specular": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "thickness": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "translucency": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
# "transmission": {"mimeType": "image/jpeg", "resolution": "2048x2048"}, | |
} | |
######## SETUP - FINISHED ######## | |
# Function to normalize category strings | |
def normalize_category(category): | |
return category.strip().lower().rstrip('s') | |
# Function to convert categories to a space-separated string | |
def categories_to_string(categories, separator=" "): | |
result = [] | |
if isinstance(categories, dict): | |
for key, value in categories.items(): | |
if isinstance(value, dict) and value: | |
subcategories = categories_to_string(value, separator) | |
if subcategories: | |
result.append(f"{key}{separator}{subcategories}") | |
else: | |
result.append(key) | |
elif isinstance(value, list): | |
for item in value: | |
result.append(normalize_category(item)) | |
else: | |
result.append(normalize_category(key)) | |
return separator.join(result) | |
def get_asset_download_id(asset_id): | |
url = "https://quixel.com/v1/downloads" | |
paramToPass = get_asset_payload(asset_id) | |
trys = 0 | |
paramLen = len(paramToPass["components"]) | |
while(trys < paramLen-1): | |
trys += 1 | |
download_url_response = requests.post( | |
url, | |
headers={"Authorization": "Bearer " + TOKEN}, | |
json=paramToPass | |
) | |
if download_url_response.status_code == 200: | |
print(f"Found Download URL for {asset_id}") | |
return download_url_response.json()["id"] | |
elif download_url_response.status_code == 401: | |
print(( | |
"Possible expired token. Please get a new one from " | |
"https://quixel.com/megascans/home/ and then update the script.\n" | |
"If you just ran the script and downloaded stuff prior to this, " | |
"just re-run the script and try again." | |
)) | |
os._exit(0) | |
else: | |
print(f"Failed to get asset download url for id: {asset_id}") | |
response = download_url_response.json() | |
if response["code"] == "INVALID_PAYLOAD": | |
if "type not found" in response["msg"]: | |
payload = json.loads(response["msg"].replace(" type not found", "")) | |
if payload: | |
for idx, v in enumerate(paramToPass["components"]): | |
if paramToPass["components"][idx]["type"] == payload["type"]: | |
paramToPass["components"].pop(idx) | |
print("Removed "+payload["type"]+" from payload. Trying again...") | |
break | |
else: | |
print("DEBUG_ERROR: " + str(response)) | |
return None | |
else: | |
print("DEBUG_ERROR: " + str(response)) | |
return None | |
def check_zip_file(file_path): | |
the_zip_file = zipfile.ZipFile(file_path) | |
result = the_zip_file.testzip() | |
if result is not None: | |
print("!! First bad file in zip: %s" % result) | |
def download_asset(download_id, download_directory, asset_id): | |
# full_name = asset_metadata["asset_metadata"][download_id] | |
os.makedirs(download_directory, exist_ok=True) | |
url = f"https://assetdownloads.quixel.com/download/{download_id}?preserveStructure=True&url=https%3A%2F%2Fquixel.com%2Fv1%2Fdownloads" | |
response = requests.get(url, stream=True) | |
if response.status_code == 400: | |
# Print the response to see what's causing the issue | |
print(f"Error 400: {response.text}") | |
attempt_count = 0 | |
delay = 3 | |
max_attempts = 5 | |
while attempt_count < max_attempts: | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
content_disposition = response.headers.get("Content-Disposition") | |
if content_disposition: | |
filename = content_disposition.split("filename=")[-1].strip('"') | |
else: | |
filename = download_id | |
asset = asset_metadata["asset_metadata"].get(asset_id) | |
if asset: | |
asset_name = asset["name"].strip().replace(" ", "_") | |
ext = os.path.splitext(filename)[-1] | |
base_path = os.path.dirname(filename) | |
filename = os.path.join(base_path, f"{asset_name}_{asset_id}{ext}") | |
file_path = os.path.join(download_directory, filename) | |
try: | |
print(f"Downloading file: {file_path} ###") | |
with requests.get(url, stream=True) as r: | |
with open(file_path, 'wb') as f: | |
shutil.copyfileobj(r.raw, f) | |
check_zip_file(file_path.replace('\\', '/')) | |
return True | |
except requests.exceptions.ChunkedEncodingError as e: | |
print(f"Error during download: {e}") | |
time.sleep(delay) | |
delay += 3 | |
attempt_count += 1 | |
except zipfile.BadZipFile: | |
print(( | |
"Bad zip file found, removing zip file and attempting to " | |
"redownload..." | |
)) | |
os.remove(file_path) | |
else: | |
print(( | |
f"ERROR: Failed to download asset {download_id}, status code: " | |
f"{response.status_code}\nResponse: {response.text}" | |
)) | |
return False | |
print(f"Exceeded maximum retry attempts for asset {download_id}") | |
return False | |
# Function to handle downloading for threading | |
def download_asset_with_id(asset): | |
asset_id = asset["asset_id"] | |
path = "" | |
if(SAVE_IN_SUBDIRECTORIES): | |
path = asset["path"].lower() | |
download_id = get_asset_download_id(asset_id) | |
if download_id: | |
return download_asset(download_id, os.path.join(DOWNLOAD_PATH, path), asset_id) | |
else: | |
print(f"No download id found for {asset_id}.") | |
return False | |
def get_asset_payload(asset_id): | |
asset = [asset for asset in asset_metadata["asset_metadata"].values() if asset["full_metadata"]["id"] == asset_id] | |
if len(asset) <=0: | |
print(f"Asset {asset_id} not found in asset_metadata.json!") | |
return False | |
asset = asset[0] | |
if "components" in asset["full_metadata"]: | |
texture_types = {} | |
for component in asset["full_metadata"]["components"]: | |
resolutions = [] | |
for uri in component["uris"]: | |
for resolution_dict in uri["resolutions"]: | |
resolution = resolution_dict['resolution'] | |
resolutions.append(resolution) | |
texture_types[component["type"]] = resolutions | |
# Surfaces | |
else: | |
texture_types = {} | |
for map in asset["full_metadata"]["maps"]: | |
texture = map["type"] | |
if texture not in texture_types: | |
texture_types[texture] = [] | |
texture_types[texture].append(map["resolution"]) | |
asset_components = [] | |
for texture, resolutions in texture_types.items(): | |
resolution_requested = MIME_TEXTURE_TYPES.get( | |
texture, | |
MIME_TEXTURE_TYPES["default"] | |
)["resolution"] | |
# Get highest AVAILABLE resolution for that texture | |
if resolution_requested not in resolutions: | |
highest_value = 0 | |
for res in resolutions: | |
w, h = res.split("x") | |
res_sum = int(w) + int(h) | |
if res_sum > highest_value: | |
highest_value = res_sum | |
resolution_requested = res | |
asset_components.append({ | |
"type": texture, | |
"mimeType": MIME_TEXTURE_TYPES.get(texture, MIME_TEXTURE_TYPES["default"])["mimeType"], | |
"resolution": resolution_requested, | |
}) | |
payload = {"asset": asset_id, | |
"config": {"highpoly": HIGHPOLY, | |
"lowerlod_meshes": True, | |
"lowerlod_normals": True, | |
"ztool": ZTOOL, | |
"brushes": True, | |
"meshMimeType": "application/x-fbx", | |
"albedo_lods": True}, | |
"components": asset_components} | |
return payload | |
if __name__ == "__main__": | |
with open(JSON_FILE_PATH, 'r') as f: | |
asset_categories_dict = json.load(f) | |
print(f"Trying to match against target category: {TARGET_CATEGORY}") | |
# Load cached assets | |
cached_assets = set() | |
if os.path.exists(CACHE_FILE_PATH): | |
with open(CACHE_FILE_PATH, "r") as cache_file: | |
cached_assets = set(cache_file.read().splitlines()) | |
# Normalize target category for matching | |
normalized_target_categories = [normalize_category(part) for part in TARGET_CATEGORY.split("/")] | |
matching_asset_ids = [] | |
# Check matches for each asset in the loaded categories | |
for asset_id, categories in asset_categories_dict.items(): | |
# Convert the categories to a single string for matching | |
categories_str = categories_to_string(categories) | |
categories_path = categories_to_string(categories, "/") | |
# Check if all parts of target_category exist in the categories string | |
matches = all(normalize_category(part) in categories_str.lower() for part in normalized_target_categories) | |
if matches and asset_id not in cached_assets: | |
matching_asset_ids.append({"asset_id": asset_id, "path": categories_path}) | |
elif matches and asset_id in cached_assets and OVERWRITE: | |
matching_asset_ids.append({"asset_id": asset_id, "path": categories_path}) | |
if not matching_asset_ids: | |
print("No new assets found for the target category.") | |
exit() | |
print(f"{len(matching_asset_ids)} assets found.") | |
# Allow user to decide how many to download at once. | |
if MAX_ITEM_COUNT: | |
item_count = min(len(matching_asset_ids), MAX_ITEM_COUNT) | |
else: | |
item_count = len(matching_asset_ids) | |
confirmation = input(( | |
f"Do you want to download {item_count} {TARGET_CATEGORY} assets? " | |
"(y/n): " | |
)).strip().lower() | |
if confirmation != "y": | |
print("Download canceled.") | |
exit() | |
try: | |
with open(ASSET_METADATA_PATH, "r", encoding="utf-8") as f: | |
asset_metadata = json.load(f) | |
except FileNotFoundError: | |
print(( | |
"Couldn't find asset_metadata.json in the directory you selected, " | |
f"{ASSET_METADATA_PATH}. Quitting." | |
)) | |
exit() | |
time_start = time.time() | |
with open(CACHE_FILE_PATH, "a+") as cache_file: | |
# Use threading for faster downloading | |
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: | |
download_count = item_count | |
futures = {executor.submit(download_asset_with_id, asset): asset for asset in matching_asset_ids[:item_count]} | |
for future in concurrent.futures.as_completed(futures): | |
asset = futures[future] | |
asset_id = asset["asset_id"] | |
try: | |
result = future.result() | |
if result: | |
download_count-=1 | |
print(f"{download_count} remaining items to download.") | |
# Add the asset to the cache file after successful download | |
cache_file.write(f"{asset_id}\n") | |
cache_file.flush() | |
except Exception as e: | |
print(f"Error downloading asset {asset_id}: {e}") | |
print(f"Time Taken: {time.time()-time_start} seconds") |
// Function to get the value of a specific cookie by name | |
function getCookie(name) { | |
const value = `; ${document.cookie}`; | |
const parts = value.split(`; ${name}=`); | |
if (parts.length === 2) return parts.pop().split(';').shift(); | |
} | |
// Get the auth cookie | |
const authCookie = getCookie('auth'); | |
// Parse the auth cookie (it should be a JSON string containing the token) | |
if (authCookie) { | |
try { | |
const authData = JSON.parse(decodeURIComponent(authCookie)); | |
const authToken = authData.token; | |
console.log("Auth Token:", authToken); | |
} catch (error) { | |
console.error("Error parsing auth cookie:", error); | |
} | |
} else { | |
console.error("Auth cookie not found. Please make sure you are logged in."); | |
} |
Hey @PolyShifter,
Using the script on Surfaces and going well. Downloading 2k versions and getting about 1000-2000 files per hour (getting faster as the night goes on... probably because of less traffic). I am getting a significant number of 'connection broken' errors though (about 1 out of 10). Any suggestions?
Also, are there any categories that will be problematic? Any workarounds?
Did you ever get 3D assets working?
@ArtFreex I am having issues with brushes as well. Did you find a solution?
I have some ability to modify the code so if you can point me in the right direction I can probably take care of it.
@ProtoNoob sadly no... I kinda gave up on getting those to work. And I also can't point you in the right direction as I can't seem to make sense of that error..
Have you tried
pip install requests
?