Created
January 25, 2025 06:34
-
-
Save Windowsfreak/b89b007f02dc65436256eb50c0b83fc7 to your computer and use it in GitHub Desktop.
Playground Downloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import csv | |
from os import mkdir | |
import time | |
import requests | |
from datetime import datetime | |
from PIL import Image | |
from io import BytesIO | |
import piexif | |
# download this file and save as "playground-DISPLAYNAME.json" | |
# https://playground.com/api/images/user?limit=100000&cursor=0&userId=INSERTHERE&id=INSERTHERE&likedImages=false&sortBy=Newest&filter=All&dateFilter=%7B%22start%22:null,%22end%22:null%7D | |
# Configuration | |
displayname = "user" | |
base_json_file = f"playground-{displayname}.json" | |
completed_batches_file = f"completed-batches-{displayname}.csv" | |
results_json_file = f"results-{displayname}.json" | |
operating_path = "../playground-images/" | |
# Utility Functions | |
def sanitize_filename(filename): | |
# Replace only invalid characters for file systems | |
invalid_chars = '<>:"/\\|?*\r\n\t' | |
return ''.join(c if c not in invalid_chars else '_' for c in filename) | |
def read_completed_batches(file): | |
if os.path.exists(operating_path + file): | |
with open(operating_path + file, 'r') as f: | |
return set(f.read().splitlines()) | |
return set() | |
def write_completed_batch(file, batch_id): | |
if batch_id: | |
file.write(batch_id + '\n') | |
def format_timestamp(timestamp): | |
return datetime.fromisoformat(timestamp.replace('Z', '')).strftime('%Y-%m-%d %H-%M-%S') | |
def save_image(image_url, filename, prompt, artist): | |
retries = 5 | |
timeout = 120 | |
delay = 20 | |
for attempt in range(1, retries + 1): | |
start_time = time.time() | |
try: | |
response = requests.get(image_url, timeout=timeout) | |
response.raise_for_status() | |
image_data = BytesIO(response.content) | |
with open(operating_path + filename, 'wb') as f: | |
f.write(image_data.getvalue()) | |
break | |
except (requests.exceptions.RequestException, ConnectionError) as e: | |
elapsed_time = time.time() - start_time | |
print(f"Attempt {attempt} failed: {e}") | |
if attempt == retries: | |
raise RuntimeError(f"Failed to download image after {retries} retries: {image_url}") from e | |
if elapsed_time < 60: | |
time.sleep(delay) | |
time.sleep(1) | |
if filename.endswith('.png'): | |
return | |
# Add EXIF metadata to the saved file | |
try: | |
exif_dict = piexif.load(image_data.getvalue()) | |
exif_dict['0th'][piexif.ImageIFD.ImageDescription] = prompt.encode('utf-8') | |
exif_dict['0th'][piexif.ImageIFD.Artist] = artist.encode('utf-8') | |
exif_bytes = piexif.dump(exif_dict) | |
piexif.insert(exif_bytes, operating_path + filename) | |
except Exception as e: | |
print(f"Error adding EXIF metadata to {filename}: {e}") | |
def group_by_batch(data): | |
grouped = {} | |
for image in data: | |
batch_id = image['batchId'] | |
if batch_id not in grouped: | |
grouped[batch_id] = { | |
'images': [], | |
'userId': image['userId'], | |
'userName': image['user']['displayName'], | |
'createdAt': [] | |
} | |
grouped[batch_id]['images'].append(image) | |
grouped[batch_id]['createdAt'].append(datetime.fromisoformat(image['createdAt'].replace('Z', ''))) | |
return grouped | |
def process_batch(batch_id, batch_data, completed_batches, file, displayname): | |
if batch_id in completed_batches: | |
return None | |
images = [] | |
avg_timestamp = min(batch_data['createdAt']) | |
timestamp = format_timestamp(avg_timestamp.isoformat()) | |
prompt = sanitize_filename(batch_data['images'][0]['prompt'][:70]) | |
counter = 1 | |
# Prepare filenames | |
batch_json_name = f"{timestamp} {prompt}.json" | |
for image in batch_data['images']: | |
extension = image['url'].split('.')[-1] # Get the file extension (e.g., jpg or png) | |
image_filename = f"{timestamp} {prompt} {counter}.{extension}" | |
save_image(image['url'], image_filename, image['prompt'], displayname) | |
image['url'] = image_filename # Update URL in batch JSON | |
image.pop('url_jpeg', None) # Remove url_jpeg if present | |
image.pop('user', None) # Remove user key if present | |
images.append(image) | |
counter += 1 | |
# Create batch JSON file | |
with open(operating_path + batch_json_name, 'w') as f: | |
batch_data = { | |
"images": images, | |
"userId": batch_data['userId'], | |
"userName": batch_data['userName'], | |
} | |
json.dump(batch_data, f, separators=(',', ':')) | |
write_completed_batch(file, batch_id) | |
return batch_data | |
def run(displayname): | |
print(f"Processing {displayname}...") | |
global base_json_file, completed_batches_file, results_json_file, operating_path | |
base_json_file = f"playground-{displayname}.json" | |
completed_batches_file = f"completed-batches-{displayname}.csv" | |
results_json_file = f"results-{displayname}.json" | |
operating_path = f"../playground-images/{displayname}/" | |
try: | |
mkdir(operating_path) | |
except FileExistsError: | |
pass | |
with open("../playground-images/" + base_json_file, 'r') as f: | |
data = json.load(f) | |
completed_batches = read_completed_batches(completed_batches_file) | |
grouped_batches = group_by_batch(data['images']) | |
results = [] | |
with open(operating_path + completed_batches_file, 'a') as batch_file: | |
for batch_id, batch_data in grouped_batches.items(): | |
batch_result = process_batch(batch_id, batch_data, completed_batches, batch_file, displayname) | |
if batch_result: | |
results.append(batch_result) | |
# Save results JSON | |
with open(operating_path + results_json_file, 'w') as f: | |
json.dump({"images": results}, f, separators=(',', ':')) | |
def main(): | |
# Path to the folder containing the JSON files | |
json_folder = "../playground-images/" | |
# Look for all files matching the pattern `playground-USERNAME.json` | |
json_files = [f for f in os.listdir(json_folder) if f.startswith("playground-") and f.endswith(".json")] | |
json_files.sort(key=lambda f: os.path.getsize(os.path.join(json_folder, f))) | |
# Iterate over each file and extract the username | |
for json_file in json_files: | |
run(json_file[len("playground-"):-len(".json")]) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment