Last active
April 30, 2024 04:43
-
-
Save anil3a/d199ab41dd6bb5df4d50ac4e664be25a to your computer and use it in GitHub Desktop.
Query Immich Postgres database for all assets to find any missing images in upload path
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
services: | |
# ... existing containers from immich docker-compose.yml | |
debug: | |
image: python:3.11.9-bullseye | |
container_name: immich_debugger | |
command: tail -f /dev/null | |
env_file: | |
- stack.env | |
volumes: | |
- ${UPLOAD_LOCATION}:/usr/src/app/upload | |
depends_on: | |
- immich-server |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import psycopg2 | |
import csv | |
from tqdm import tqdm | |
class PostgreSQLConnector: | |
def __init__(self, dbname, user, password, host='localhost', port='5432', parent_path='/var/www/'): | |
self.dbname = dbname | |
self.user = user | |
self.password = password | |
self.host = host | |
self.port = port | |
self.connection = None | |
self.cursor = None | |
self.parent_path = parent_path | |
def connect(self): | |
try: | |
self.connection = psycopg2.connect( | |
dbname=self.dbname, | |
user=self.user, | |
password=self.password, | |
host=self.host, | |
port=self.port | |
) | |
self.cursor = self.connection.cursor() | |
print("Connected to the database.") | |
except psycopg2.Error as e: | |
print(f"Error connecting to the database: {e}") | |
def close_connection(self): | |
if self.connection: | |
self.connection.close() | |
print("Connection to the database closed.") | |
def execute_query(self, query): | |
try: | |
self.cursor.execute(query) | |
self.connection.commit() | |
except psycopg2.Error as e: | |
print(f"Error executing query: {e}") | |
def get_assets(self, limit=10, where="") -> list: | |
query = f"SELECT id, \"originalPath\", \"thumbnailPath\" FROM assets {where} limit {limit};" | |
self.execute_query(query) | |
return self.cursor.fetchall() | |
def get_assets_all_fields(self, limit=10, where="") -> list: | |
query = f"SELECT * FROM assets {where} limit {limit};" | |
self.execute_query(query) | |
return self.cursor.fetchall() | |
def list_missing_files(self, table="assets", limit=10): | |
rows = self.get_assets(limit=limit) | |
missing_files_count = 0 | |
print("{:<38} {:<8} {:<8} {:<}".format("ID", "Origin", "Thumb", "OPath")) | |
print("-" * 120) | |
for row in rows: | |
id, original, thumb = row | |
original_exists = self.check_file_exists(original) | |
thumb_exists = self.check_file_exists(thumb) | |
if not (original_exists and thumb_exists): | |
print( | |
"{:<38} {:<8} {:<8} {:<}".format( | |
id, "YES" if original_exists else "NO", "YES" if thumb_exists else "NO", original | |
) | |
) | |
missing_files_count += 1 | |
# row_data.append({ | |
# "id" : id, | |
# "original" : original_exists, | |
# "original_path" : f"{self.parent_path}{original}", | |
# "thumb" : thumb_exists, | |
# "thumb_path" : f"{self.parent_path}{thumb}", | |
# }) | |
if missing_files_count < 1: | |
print("No missing assets found") | |
else: | |
print(f"Number of missing assets: {missing_files_count}") | |
# print(row_data) | |
def export_missing_files(self, table="assets",limit=10): | |
rows = self.get_assets_all_fields(limit=limit) | |
row_data = [] | |
output_file = 'missing_assets.csv' | |
print("Looping all rows and rewriting into file for missing assets....") | |
missing_files_count = 0 | |
with open(output_file, 'w', newline='') as csvfile: | |
writer = csv.writer(csvfile) | |
header_row = [col_desc[0] for col_desc in self.cursor.description] | |
original_index = header_row.index('originalPath') | |
thumbnail_index = header_row.index('thumbnailPath') | |
header_row.insert(0, "Original exists") | |
header_row.insert(1, "Thumbnail exists") | |
writer.writerow(header_row) | |
for row in tqdm(rows, desc="Processing all rows", unit="row"): | |
original = row[original_index] | |
thumb = row[thumbnail_index] | |
original_exists = self.check_file_exists(original) | |
thumb_exists = self.check_file_exists(thumb) | |
if not (original_exists and thumb_exists): | |
missing_files_count += 1 | |
writer.writerow( | |
("YES" if original_exists else "NO", "YES" if thumb_exists else "NO",) + row | |
) | |
print(f"Number of missing assets: {missing_files_count}") | |
print(f"Exported data into the file successfully: {output_file}") | |
def check_file_exists(self, path: [str, None]) -> bool: | |
if not path: | |
return False | |
full_path = f"{self.parent_path}{path}" | |
if not os.path.exists(full_path): | |
return False | |
return os.path.exists(full_path) | |
if __name__ == "__main__": | |
# Initialize PostgreSQLConnector object | |
connector = PostgreSQLConnector( | |
host=os.environ.get('DB_HOSTNAME'), | |
dbname=os.environ.get('DB_DATABASE_NAME'), | |
user=os.environ.get('DB_USERNAME'), | |
password=os.environ.get('DB_PASSWORD'), | |
parent_path="/usr/src/app/" | |
) | |
# Connect to the database | |
connector.connect() | |
# List missing files | |
connector.list_missing_files(limit=10) | |
# connector.export_missing_files() | |
# Close the database connection | |
connector.close_connection() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment