Skip to content

Instantly share code, notes, and snippets.

@Windowsfreak
Created January 25, 2025 06:34
Show Gist options
  • Save Windowsfreak/b89b007f02dc65436256eb50c0b83fc7 to your computer and use it in GitHub Desktop.
Save Windowsfreak/b89b007f02dc65436256eb50c0b83fc7 to your computer and use it in GitHub Desktop.
Playground Downloader
import os
import json
import csv
from os import mkdir
import time
import requests
from datetime import datetime
from PIL import Image
from io import BytesIO
import piexif
# download this file and save as "playground-DISPLAYNAME.json"
# https://playground.com/api/images/user?limit=100000&cursor=0&userId=INSERTHERE&id=INSERTHERE&likedImages=false&sortBy=Newest&filter=All&dateFilter=%7B%22start%22:null,%22end%22:null%7D
# Configuration
displayname = "user"
base_json_file = f"playground-{displayname}.json"
completed_batches_file = f"completed-batches-{displayname}.csv"
results_json_file = f"results-{displayname}.json"
operating_path = "../playground-images/"
# Utility Functions
def sanitize_filename(filename):
# Replace only invalid characters for file systems
invalid_chars = '<>:"/\\|?*\r\n\t'
return ''.join(c if c not in invalid_chars else '_' for c in filename)
def read_completed_batches(file):
if os.path.exists(operating_path + file):
with open(operating_path + file, 'r') as f:
return set(f.read().splitlines())
return set()
def write_completed_batch(file, batch_id):
if batch_id:
file.write(batch_id + '\n')
def format_timestamp(timestamp):
return datetime.fromisoformat(timestamp.replace('Z', '')).strftime('%Y-%m-%d %H-%M-%S')
def save_image(image_url, filename, prompt, artist):
retries = 5
timeout = 120
delay = 20
for attempt in range(1, retries + 1):
start_time = time.time()
try:
response = requests.get(image_url, timeout=timeout)
response.raise_for_status()
image_data = BytesIO(response.content)
with open(operating_path + filename, 'wb') as f:
f.write(image_data.getvalue())
break
except (requests.exceptions.RequestException, ConnectionError) as e:
elapsed_time = time.time() - start_time
print(f"Attempt {attempt} failed: {e}")
if attempt == retries:
raise RuntimeError(f"Failed to download image after {retries} retries: {image_url}") from e
if elapsed_time < 60:
time.sleep(delay)
time.sleep(1)
if filename.endswith('.png'):
return
# Add EXIF metadata to the saved file
try:
exif_dict = piexif.load(image_data.getvalue())
exif_dict['0th'][piexif.ImageIFD.ImageDescription] = prompt.encode('utf-8')
exif_dict['0th'][piexif.ImageIFD.Artist] = artist.encode('utf-8')
exif_bytes = piexif.dump(exif_dict)
piexif.insert(exif_bytes, operating_path + filename)
except Exception as e:
print(f"Error adding EXIF metadata to {filename}: {e}")
def group_by_batch(data):
grouped = {}
for image in data:
batch_id = image['batchId']
if batch_id not in grouped:
grouped[batch_id] = {
'images': [],
'userId': image['userId'],
'userName': image['user']['displayName'],
'createdAt': []
}
grouped[batch_id]['images'].append(image)
grouped[batch_id]['createdAt'].append(datetime.fromisoformat(image['createdAt'].replace('Z', '')))
return grouped
def process_batch(batch_id, batch_data, completed_batches, file, displayname):
if batch_id in completed_batches:
return None
images = []
avg_timestamp = min(batch_data['createdAt'])
timestamp = format_timestamp(avg_timestamp.isoformat())
prompt = sanitize_filename(batch_data['images'][0]['prompt'][:70])
counter = 1
# Prepare filenames
batch_json_name = f"{timestamp} {prompt}.json"
for image in batch_data['images']:
extension = image['url'].split('.')[-1] # Get the file extension (e.g., jpg or png)
image_filename = f"{timestamp} {prompt} {counter}.{extension}"
save_image(image['url'], image_filename, image['prompt'], displayname)
image['url'] = image_filename # Update URL in batch JSON
image.pop('url_jpeg', None) # Remove url_jpeg if present
image.pop('user', None) # Remove user key if present
images.append(image)
counter += 1
# Create batch JSON file
with open(operating_path + batch_json_name, 'w') as f:
batch_data = {
"images": images,
"userId": batch_data['userId'],
"userName": batch_data['userName'],
}
json.dump(batch_data, f, separators=(',', ':'))
write_completed_batch(file, batch_id)
return batch_data
def run(displayname):
print(f"Processing {displayname}...")
global base_json_file, completed_batches_file, results_json_file, operating_path
base_json_file = f"playground-{displayname}.json"
completed_batches_file = f"completed-batches-{displayname}.csv"
results_json_file = f"results-{displayname}.json"
operating_path = f"../playground-images/{displayname}/"
try:
mkdir(operating_path)
except FileExistsError:
pass
with open("../playground-images/" + base_json_file, 'r') as f:
data = json.load(f)
completed_batches = read_completed_batches(completed_batches_file)
grouped_batches = group_by_batch(data['images'])
results = []
with open(operating_path + completed_batches_file, 'a') as batch_file:
for batch_id, batch_data in grouped_batches.items():
batch_result = process_batch(batch_id, batch_data, completed_batches, batch_file, displayname)
if batch_result:
results.append(batch_result)
# Save results JSON
with open(operating_path + results_json_file, 'w') as f:
json.dump({"images": results}, f, separators=(',', ':'))
def main():
# Path to the folder containing the JSON files
json_folder = "../playground-images/"
# Look for all files matching the pattern `playground-USERNAME.json`
json_files = [f for f in os.listdir(json_folder) if f.startswith("playground-") and f.endswith(".json")]
json_files.sort(key=lambda f: os.path.getsize(os.path.join(json_folder, f)))
# Iterate over each file and extract the username
for json_file in json_files:
run(json_file[len("playground-"):-len(".json")])
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment