Last active
April 30, 2020 15:05
-
-
Save ross-spencer/895b5a346729075dd98f76cd5314728c to your computer and use it in GitHub Desktop.
Archivematica Elasticsearch Duplicate Finder
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""Module docstring.""" | |
import collections | |
import json | |
import logging | |
import operator | |
import sys | |
from elasticsearch import Elasticsearch | |
# Logging configuration | |
FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
logging.basicConfig(format=FORMAT) | |
logger = logging.getLogger("aip.duplicates") | |
logger.setLevel("INFO") | |
# We are only interested in transferred objects. | |
TRANSFER_DIR = "%transferDirectory%objects" | |
# AIP Reference. | |
Aip = collections.namedtuple( | |
"Aip", "aip original_name checksum checksum_type uuid" | |
) | |
# ES configuration | |
PAGING_SIZE = 10 | |
# Connect to Elasticsearch | |
es = Elasticsearch(["http://127.0.0.1:62002"]) | |
""" | |
ES Structure Reference. The information we want about checksums per file will | |
be in this structure. | |
"_source": { | |
"mets": { | |
"ns0:mets_dict_list": [{ | |
"ns0:amdSec_dict_list": [{ | |
"ns0:techMD_dict_list": [{ | |
"ns0:mdWrap_dict_list": [{ | |
"ns0:xmlData_dict_list": [{ | |
"ns3:object_dict_list": [{ | |
"ns3:originalName": "%SIPDirectory%objects/metadata/transfers/broken_premis", | |
"ns3:objectIdentifier_dict_list": [{ | |
"ns3:objectIdentifierType": "UUID", | |
"ns3:objectIdentifierValue": "42e03ba0-4a97-49da-9892-73f3b52d1a9a"}], | |
"ns3:objectCharacteristics_dict_list": [{ | |
"ns3:format_dict_list": [{ | |
"ns3:formatRegistry_dict_list": [], | |
"ns3:formatDesignation_dict_list": []}], | |
"ns3:creatingApplication_dict_list": [], | |
"ns3:fixity_dict_list": [{ | |
"ns3:messageDigestAlgorithm": "sha256", | |
"ns3:messageDigest": "fb34fb220e77f567b90c91fee37ec32cd95e4a56befe9f8722 ..."}] | |
""" | |
fields = { | |
"_source": [ | |
"*.ns3:fixity_dict_list", | |
"*.ns3:originalName", | |
"*.ns3:objectIdentifier_dict_list", | |
"uuid", | |
] | |
} | |
# If you want to filter the data you can set the query like this. | |
# Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/ | |
# search-request-source-filtering.html | |
fields_with_filter = { | |
"_source": [ | |
"*.ns3:fixity_dict_list", | |
"*.ns3:originalName", | |
"*.ns3:objectIdentifier_dict_list", | |
"uuid", | |
], | |
"query": {"wildcard": {"ns3:eventType": "*"}}, | |
} | |
def data_processed(mets): | |
"""Maintains a running total of all the data processed.""" | |
return sys.getsizeof(mets) | |
def pretty_print(mets_dict): | |
"""A nice wrapper to pretty print any JSON we need to debug.""" | |
return json.dumps(mets_dict, indent=4) | |
def get_hash_data(mets_dict, aip_uuid, checksum_list, duplicate_list): | |
"""Retrieve the hashes from a METS struct.""" | |
tech_md_secs = mets_dict[0]["ns0:amdSec_dict_list"] | |
for tech_md in tech_md_secs: | |
object_dict = None | |
try: | |
object_dict = tech_md["ns0:techMD_dict_list"][0][ | |
"ns0:mdWrap_dict_list" | |
][0]["ns0:xmlData_dict_list"][0]["ns3:object_dict_list"] | |
except KeyError: | |
pass | |
if object_dict: | |
original_name = object_dict[0]["ns3:originalName"] | |
if TRANSFER_DIR in original_name: | |
uuid = None | |
checksum = None | |
checksum_type = None | |
uuid_loc = object_dict[0]["ns3:objectIdentifier_dict_list"] | |
try: | |
if uuid_loc[0]["ns3:objectIdentifierType"] == "UUID": | |
uuid = uuid_loc[0]["ns3:objectIdentifierValue"] | |
except KeyError: | |
logger.info("UUID identifiers not found.") | |
fixity_loc = object_dict[0][ | |
"ns3:objectCharacteristics_dict_list" | |
] | |
try: | |
checksum_type = fixity_loc[0]["ns3:fixity_dict_list"][0][ | |
"ns3:messageDigestAlgorithm" | |
] | |
checksum = fixity_loc[0]["ns3:fixity_dict_list"][0][ | |
"ns3:messageDigest" | |
] | |
if checksum not in checksum_list: | |
checksum_list[checksum] = None | |
else: | |
# Add the aip to out running duplicate list. | |
aip = Aip( | |
aip=aip_uuid, | |
original_name=original_name, | |
uuid=uuid, | |
checksum=checksum, | |
checksum_type=checksum_type, | |
) | |
duplicate_list[aip] = checksum | |
except KeyError: | |
logger.info("Fixity dict_list not found.") | |
def count_results(): | |
"""Count the number of results we're expecting to return form ES.""" | |
results = es.search( | |
index="aips", doc_type="aip", body=fields, from_=0, size=0 | |
) | |
total = results["hits"]["total"] | |
logger.info("Returning %s results", total) | |
return total | |
def get_mets(page=0): | |
"""Connect to Elasticsearch per number of pages we require and retrieve | |
results. | |
""" | |
logger.info("Connecting to ES for results, page %s", page) | |
# Connect to the ES client and perform a search. | |
return ( | |
es.search( | |
index="aips", | |
doc_type="aip", | |
body=fields, | |
from_=page, | |
size=PAGING_SIZE, | |
), | |
(page + PAGING_SIZE), | |
) | |
def main(): | |
"""Primary entry point for this script.""" | |
# Keep track of the amount of data we need to process to get the | |
# information we need. | |
total_data = 0 | |
# Keep track of all the AIP data | |
duplicate_list = {} | |
checksum_list = {} | |
expected_results = count_results() | |
results, page = get_mets() | |
while results["hits"]["hits"]: | |
total_data = total_data + data_processed(results) | |
for hit in results["hits"]["hits"]: | |
try: | |
aip_uuid = hit["_source"]["uuid"] | |
mets_dict = hit["_source"]["mets"]["ns0:mets_dict_list"] | |
get_hash_data( | |
mets_dict, aip_uuid, checksum_list, duplicate_list | |
) | |
except KeyError: | |
pass | |
if page >= expected_results: | |
break | |
results, page = get_mets(page) | |
logger.info("%s bytes processed", total_data) | |
sorted_dict = sorted(duplicate_list.items(), key=operator.itemgetter(1)) | |
for dupe in sorted_dict: | |
print(dupe) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment