Skip to content

Instantly share code, notes, and snippets.

@aindlq
Created March 19, 2025 12:14
Show Gist options
  • Save aindlq/c7410d52ae0016aedb5b236f2f40be45 to your computer and use it in GitHub Desktop.
Save aindlq/c7410d52ae0016aedb5b236f2f40be45 to your computer and use it in GitHub Desktop.
Batch indexing with pastec
import csv
import os
import requests
from datetime import datetime
import argparse
import json
import glob
# Constants
API_URL = "http://localhost:4212/index/images/batch"
CSV_FOLDER = "csv_folder" # Default folder containing CSV files
ADD_LOG_DIR = "Add_log" # Directory for add logs
LOG_FILE = os.path.join(ADD_LOG_DIR, "pastec_batch_index_add_log.txt")
SUCCESS_LOG = os.path.join(ADD_LOG_DIR, "pastec_batch_index_add_success_log.csv")
FAILED_LOG = os.path.join(ADD_LOG_DIR, "pastec_batch_index_add_failed_log.csv")
INDEX_PATH = "/pastec/build/pastec-index/pastec_index.dat"
# Function to read the source CSV with auto-generated IDs
def read_source_csv(file_path, start_id):
image_records = []
current_id = start_id
with open(file_path, mode="r", encoding="utf-8-sig") as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
image_records.append({
"image_id": current_id,
"url": f"file://{row['file'].strip()}",
"tag": row['photo'].strip()
})
current_id += 1
return image_records, current_id
def ensure_log_directory():
"""Ensure the add log directory exists."""
if not os.path.exists(ADD_LOG_DIR):
os.makedirs(ADD_LOG_DIR)
print(f"Created log directory: {ADD_LOG_DIR}")
# Function to log messages
def log_message(message, log_file=LOG_FILE):
timestamp = datetime.now().isoformat()
# Ensure the log directory exists
ensure_log_directory()
with open(log_file, mode="a") as file:
file.write(f"[{timestamp}] {message}\n")
print(f"[{timestamp}] {message}")
# Function to save successful and failed additions
def save_result_logs(results):
ensure_log_directory()
# Separate successful and failed results
successful = []
failed = []
for result in results:
if result["type"] == "IMAGE_ADDED":
successful.append({
"image_id": result["image_id"],
"url": result["url"],
"nb_features_extracted": result.get("nb_features_extracted", 0)
})
else:
failed.append({
"image_id": result["image_id"],
"url": result["url"],
"type": result["type"],
"error": result.get("image_downloader_http_response_code", "Unknown error")
})
# Append to successful results
with open(SUCCESS_LOG, mode="a", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=["image_id", "url", "nb_features_extracted"])
if os.path.getsize(SUCCESS_LOG) == 0: # Write header only if file is empty
writer.writeheader()
writer.writerows(successful)
if successful:
log_message(f"Saved {len(successful)} successful additions to {SUCCESS_LOG}")
# Append to failed results
with open(FAILED_LOG, mode="a", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=["image_id", "url", "type", "error"])
if os.path.getsize(FAILED_LOG) == 0: # Write header only if file is empty
writer.writeheader()
writer.writerows(failed)
if failed:
log_message(f"Saved {len(failed)} failed additions to {FAILED_LOG}")
# Function to send batch request to API
def send_batch_request(batch_data):
try:
headers = {
'Content-Type': 'application/json'
}
log_message(f"Sending batch of {len(batch_data)} images to API...")
response = requests.post(API_URL, json=batch_data, headers=headers)
response_text = response.text
try:
response_json = json.loads(response_text)
log_message(f"Batch request completed. Type: {response_json.get('type')}, Results: {len(response_json.get('results', []))} images")
return response_json
except json.JSONDecodeError:
log_message(f"Failed to parse API response: {response_text}")
return {"type": "ERROR", "results": []}
except Exception as e:
log_message(f"Error sending batch request: {str(e)}")
return {"type": "ERROR", "results": []}
# Initialize output files
def initialize_output_files():
ensure_log_directory()
# Create empty success log file with header
with open(SUCCESS_LOG, mode="w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=["image_id", "url", "nb_features_extracted"])
writer.writeheader()
# Create empty failed log file with header
with open(FAILED_LOG, mode="w", newline="") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=["image_id", "url", "type", "error"])
writer.writeheader()
# Main function
def main():
# Ensure the log directory exists
ensure_log_directory()
initialize_output_files()
log_message(f"Starting Pastec batch indexing process for all CSV files in {CSV_FOLDER}.")
try:
# Get all CSV files in the folder
csv_files = sorted(glob.glob(os.path.join(CSV_FOLDER, "*.csv")))
if not csv_files:
log_message(f"No CSV files found in {CSV_FOLDER}. Exiting.")
return
log_message(f"Found {len(csv_files)} CSV files to process.")
# Initialize shared ID counter
current_id = 1
total_processed = 0
# Process each CSV file sequentially
for csv_file in csv_files:
log_message(f"Processing file: {csv_file}")
# Read CSV and get next ID
images, current_id = read_source_csv(csv_file, current_id)
log_message(f"Found {len(images)} images in {csv_file}")
if not images:
log_message(f"No valid images found in {csv_file}, skipping to next file.")
continue
# Send batch request for this CSV file
response = send_batch_request(images)
if response.get("type") == "BATCH_PROCESSED":
results = response.get("results", [])
# Count successful and failed
successes = sum(1 for r in results if r.get("type") == "IMAGE_ADDED")
failures = len(results) - successes
log_message(f"Batch processing complete for {csv_file}. Successful: {successes}, Failed: {failures}")
# Save logs
save_result_logs(results)
# Update total processed count
total_processed += len(images)
log_message(f"Total images processed so far: {total_processed}")
else:
log_message(f"Batch processing failed for {csv_file}. Response: {response}")
log_message(f"All CSV files processed. Total images processed: {total_processed}")
except Exception as e:
log_message(f"Error during indexing process: {str(e)}")
# Command-line interface
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Perform Pastec API batch indexing on multiple CSV files.")
parser.add_argument("--folder", type=str, default=CSV_FOLDER,
help=f"Path to the folder containing CSV files (default: {CSV_FOLDER})")
args = parser.parse_args()
# Override constants if provided via command line
if args.folder != CSV_FOLDER:
CSV_FOLDER = args.folder
main()
photo file
https://artresearch.net/resource/frick/photo/991000003979707141/51323_POST.tif /opt/data/images/frick/1000px/aHR0cHM6Ly9paWlmLmFydHJlc2VhcmNoLm5ldC9paWlmLzMvZnJpY2svNTEzMjNfUE9TVC50aWYvZnVsbC9mdWxsLzAvZGVmYXVsdC5qcGc.jpg
https://artresearch.net/resource/frick/photo/991000004599707141/51320_POST.tif /opt/data/images/frick/1000px/aHR0cHM6Ly9paWlmLmFydHJlc2VhcmNoLm5ldC9paWlmLzMvZnJpY2svNTEzMjBfUE9TVC50aWYvZnVsbC9mdWxsLzAvZGVmYXVsdC5qcGc.jpg
@aindlq
Copy link
Author

aindlq commented Mar 19, 2025

Script goes through all CSV files in a folder and send a single batch request per csv file. Because pastec requires numerical IDs, script just generates them with a counter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment