Last active
November 29, 2020 11:51
-
-
Save driskell/270467628fea76a653b7e3756ab88984 to your computer and use it in GitHub Desktop.
Clean unused catalog images from the Magento 1 media folder. Run inside the media folder. Supports ability to scan attributes with custom backends.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# yum install mysql-connector-python | |
import ConfigParser | |
from contextlib import contextmanager | |
import getopt | |
import getpass | |
import io | |
from mysql.connector import connection | |
import os | |
import re | |
import sys | |
def CursorContextManager(connection): | |
@contextmanager | |
def manager(): | |
cursor = connection.cursor() | |
try: | |
yield cursor | |
except BaseException as err: | |
print('Rolling back transaction due to exception') | |
print(err.message) | |
cursor.close() | |
connection.rollback() | |
raise | |
cursor.close() | |
connection.commit() | |
return manager | |
def usage(): | |
print('Usage: clean_images.py [-h|--host HOST] [-u|--user USER] [-p|--password PASSWORD] [-P|--prompt-password]') | |
sys.exit(1) | |
defaults = ''' | |
[client] | |
host = localhost | |
''' | |
config_parser = ConfigParser.RawConfigParser() | |
config_parser.readfp(io.BytesIO(defaults)) | |
config_parser.read([os.path.expanduser('~/.my.cnf')]) | |
def get_config(option): | |
value = None | |
for section in ['client', 'mysql']: | |
section_value = None | |
try: | |
section_value = config_parser.get(section, option) | |
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError): | |
pass | |
if section_value is not None: | |
value = section_value | |
return value | |
delete_allowed = False | |
mysql_host = get_config('host') | |
mysql_user = get_config('user') | |
mysql_password = get_config('password') | |
mysql_database = get_config('database') | |
mysql_prompt_password = False | |
custom_backends = { | |
'product': [], | |
'category': [], | |
} | |
opts, args = getopt.getopt(sys.argv[1:], 'u:h:p:d:Pb:c:D', ['user=', 'password=', 'host=', 'database=', 'prompt-password', 'backend=', 'category-backend=', 'delete']) | |
for opt, value in opts: | |
if opt in ('-u', '--user'): | |
mysql_user = value | |
elif opt in ('-p', '--password'): | |
mysql_password = value | |
elif opt in ('-h', '--host'): | |
mysql_host = value | |
elif opt in ('-d', '--database'): | |
mysql_database = value | |
elif opt in ('-P', '--prompt-password'): | |
mysql_prompt_password = True | |
elif opt in ('-b', '--backend'): | |
for one_value in value.split(','): | |
custom_backends['product'].append(one_value) | |
elif opt in ('-c', '--category-backend'): | |
for one_value in value.split(','): | |
custom_backends['category'].append(value) | |
elif opt in ('-D', '--delete'): | |
delete_allowed = True | |
else: | |
usage() | |
if mysql_prompt_password: | |
mysql_password = getpass.getpass('Password:') | |
if mysql_host == 'localhost': | |
connection = connection.MySQLConnection(unix_socket='/var/lib/mysql/mysql.sock', user=mysql_user, password=mysql_password, database=mysql_database) | |
else: | |
connection = connection.MySQLConnection(host=mysql_host, user=mysql_user, password=mysql_password, database=mysql_database) | |
cursorContext = CursorContextManager(connection) | |
_load_attributes = {} | |
def load_attributes(entity_type, filter_by, filter_value): | |
if entity_type == 'product': | |
entity_type_id = 4 | |
elif entity_type == 'category': | |
entity_type_id = 3 | |
else: | |
raise 'Invalid entity_type: %s' % entity_type | |
cache_key = '%s-%s-%s' % (entity_type_id, filter_by, filter_value) | |
if cache_key in _load_attributes: | |
return _load_attributes[cache_key] | |
attributes = {} | |
with cursorContext() as cursor: | |
cursor.execute( | |
'SELECT attribute_id, backend_type' | |
' FROM eav_attribute' | |
' WHERE entity_type_id=%%s AND %s=%%s' % filter_by, | |
(entity_type_id, filter_value) | |
) | |
for (attribute_id, backend_type) in cursor: | |
if backend_type not in attributes: | |
attributes[backend_type] = [] | |
attributes[backend_type].append(attribute_id) | |
_load_attributes[cache_key] = attributes | |
return attributes | |
def prepare_paths(prefix, path, filenames): | |
for filename in filenames: | |
filepath = os.path.join(path, filename) | |
if filepath.startswith(prefix): | |
value = filepath[len(prefix):] | |
yield (value, value) | |
def check_files(entity_type, filepaths): | |
current_used, current_unused = 0, 0 | |
filepaths = filepaths.copy() | |
with cursorContext() as cursor: | |
if entity_type == 'product': | |
if len(filepaths) != 0: | |
parameters = [filename for filename in filepaths] | |
cursor.execute( | |
# TABLE SCAN | |
'SELECT DISTINCT value ' | |
' FROM catalog_product_entity_media_gallery' | |
' WHERE value IN (%s)' | |
% ','.join(['%s'] * len(parameters)), | |
tuple(parameters) | |
) | |
for (value,) in cursor: | |
current_used += 1 | |
del filepaths[value] | |
for (backend_type, attribute_codes) in load_attributes(entity_type, 'frontend_input', 'media_image').iteritems(): | |
if len(filepaths) != 0: | |
parameters = [filename for filename in filepaths] | |
cursor.execute( | |
# TABLE SCAN | |
'SELECT DISTINCT value ' | |
' FROM catalog_%s_entity_%s' | |
' WHERE attribute_id IN (%s) AND value IN (%s)' | |
% (entity_type, backend_type, ','.join(['%s'] * len(attribute_codes)), ','.join(['%s'] * len(parameters))), | |
tuple(attribute_codes + parameters) | |
) | |
for (value,) in cursor: | |
current_used += 1 | |
del filepaths[value] | |
for (backend_type, attribute_codes) in load_attributes(entity_type, 'frontend_input', 'image').iteritems(): | |
if len(filepaths) != 0: | |
parameters = [filename[1:] for filename in filepaths] | |
cursor.execute( | |
# TABLE SCAN | |
'SELECT DISTINCT value ' | |
' FROM catalog_%s_entity_%s' | |
' WHERE attribute_id IN (%s) AND value IN (%s)' | |
% (entity_type, backend_type, ','.join(['%s'] * len(attribute_codes)), ','.join(['%s'] * len(parameters))), | |
tuple(attribute_codes + parameters) | |
) | |
for (value,) in cursor: | |
current_used += 1 | |
del filepaths['/%s' % value] | |
for backend_model in custom_backends[entity_type]: | |
for (backend_type, attribute_codes) in load_attributes(entity_type, 'backend_model', backend_model).iteritems(): | |
if len(filepaths) != 0: | |
# For simpler code, happy to match _ from the filename as any character, rather to keep too much than to lose what we need | |
# Match on each since we need to know WHAT matched inside the value | |
for filename in filepaths.copy(): | |
cursor.execute( | |
# TABLE SCAN | |
'SELECT DISTINCT %%s ' | |
' FROM catalog_%s_entity_%s' | |
' WHERE attribute_id IN (%s) AND value LIKE (%%s)' | |
% (entity_type, backend_type, ','.join(['%s'] * len(attribute_codes))), | |
tuple([filename] + attribute_codes + ['%%%s%%' % filename[1:]]) | |
) | |
for (value,) in cursor: | |
current_used += 1 | |
del filepaths[value] | |
current_unused += len(filepaths) | |
return (current_used, current_unused, filepaths) | |
def check_cache(entity_type, filepaths): | |
current_used, current_unused = 0, 0 | |
filepaths = filepaths.copy() | |
for entry in filepaths.copy(): | |
if entity_type == 'product': | |
match = re.match(r'/cache/[0-9]+/[^/]+/[^/]+/[^/]+/(.*)', entry) | |
elif entity_type == 'category': | |
match = re.match(r'/cache/[^/]+/(.*)', entry) | |
else: | |
raise 'Unknown entity_type' | |
if match is None: | |
print(' Skipping unknown cache entry: %s' % entry) | |
del filepaths[entry] | |
continue | |
originalfile = 'catalog/%s/%s' % (entity_type, match.group(1)) | |
if os.path.isfile(originalfile): | |
current_used += 1 | |
del filepaths[entry] | |
current_unused += len(filepaths) | |
return (current_used, current_unused, filepaths) | |
def delete_file(filepath): | |
if delete_allowed: | |
os.unlink(filepath) | |
action = 'Deleted' | |
else: | |
action = 'Would delete' | |
print(' %s: %s' % (action, filepath)) | |
def scan_entity_type(entity_type): | |
prefix = 'catalog/%s' % entity_type | |
entity_used, entity_unused = 0, 0 | |
for path, dirnames, filenames in os.walk(prefix): | |
print('Verifying: %s' % path) | |
for index in range(0, len(filenames), 100): | |
filepaths = dict(prepare_paths(prefix, path, filenames[index:index + 100])) | |
if path.startswith('%s/cache' % prefix): | |
chunk_used, chunk_unused, chunk_todelete = check_cache(entity_type, filepaths) | |
else: | |
chunk_used, chunk_unused, chunk_todelete = check_files(entity_type, filepaths) | |
entity_used += chunk_used | |
entity_unused += chunk_unused | |
print(' Chunk of %d: %d used / %d unused' % (len(filepaths), chunk_used, chunk_unused)) | |
for filename in chunk_todelete: | |
filepath = '%s%s' % (prefix, filename) | |
delete_file(filepath) | |
return entity_used, entity_unused | |
total_used, total_unused = 0, 0 | |
entity_used, entity_unused = scan_entity_type('category') | |
total_used += entity_used | |
total_unused += entity_unused | |
entity_used, entity_unused = scan_entity_type('product') | |
total_unused += entity_unused | |
total_used += entity_used | |
print('Totals: %d used, %d unused' % (total_used, total_unused)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment