Skip to content

Instantly share code, notes, and snippets.

@ross-spencer
Last active April 30, 2020 15:05
Show Gist options
  • Save ross-spencer/895b5a346729075dd98f76cd5314728c to your computer and use it in GitHub Desktop.
Save ross-spencer/895b5a346729075dd98f76cd5314728c to your computer and use it in GitHub Desktop.
Archivematica Elasticsearch Duplicate Finder
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Module docstring."""
import collections
import json
import logging
import operator
import sys
from elasticsearch import Elasticsearch
# Logging configuration
FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
logging.basicConfig(format=FORMAT)
logger = logging.getLogger("aip.duplicates")
logger.setLevel("INFO")
# We are only interested in transferred objects.
TRANSFER_DIR = "%transferDirectory%objects"
# AIP Reference.
Aip = collections.namedtuple(
"Aip", "aip original_name checksum checksum_type uuid"
)
# ES configuration
PAGING_SIZE = 10
# Connect to Elasticsearch
es = Elasticsearch(["http://127.0.0.1:62002"])
"""
ES Structure Reference. The information we want about checksums per file will
be in this structure.
"_source": {
"mets": {
"ns0:mets_dict_list": [{
"ns0:amdSec_dict_list": [{
"ns0:techMD_dict_list": [{
"ns0:mdWrap_dict_list": [{
"ns0:xmlData_dict_list": [{
"ns3:object_dict_list": [{
"ns3:originalName": "%SIPDirectory%objects/metadata/transfers/broken_premis",
"ns3:objectIdentifier_dict_list": [{
"ns3:objectIdentifierType": "UUID",
"ns3:objectIdentifierValue": "42e03ba0-4a97-49da-9892-73f3b52d1a9a"}],
"ns3:objectCharacteristics_dict_list": [{
"ns3:format_dict_list": [{
"ns3:formatRegistry_dict_list": [],
"ns3:formatDesignation_dict_list": []}],
"ns3:creatingApplication_dict_list": [],
"ns3:fixity_dict_list": [{
"ns3:messageDigestAlgorithm": "sha256",
"ns3:messageDigest": "fb34fb220e77f567b90c91fee37ec32cd95e4a56befe9f8722 ..."}]
"""
fields = {
"_source": [
"*.ns3:fixity_dict_list",
"*.ns3:originalName",
"*.ns3:objectIdentifier_dict_list",
"uuid",
]
}
# If you want to filter the data you can set the query like this.
# Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/
# search-request-source-filtering.html
fields_with_filter = {
"_source": [
"*.ns3:fixity_dict_list",
"*.ns3:originalName",
"*.ns3:objectIdentifier_dict_list",
"uuid",
],
"query": {"wildcard": {"ns3:eventType": "*"}},
}
def data_processed(mets):
"""Maintains a running total of all the data processed."""
return sys.getsizeof(mets)
def pretty_print(mets_dict):
"""A nice wrapper to pretty print any JSON we need to debug."""
return json.dumps(mets_dict, indent=4)
def get_hash_data(mets_dict, aip_uuid, checksum_list, duplicate_list):
"""Retrieve the hashes from a METS struct."""
tech_md_secs = mets_dict[0]["ns0:amdSec_dict_list"]
for tech_md in tech_md_secs:
object_dict = None
try:
object_dict = tech_md["ns0:techMD_dict_list"][0][
"ns0:mdWrap_dict_list"
][0]["ns0:xmlData_dict_list"][0]["ns3:object_dict_list"]
except KeyError:
pass
if object_dict:
original_name = object_dict[0]["ns3:originalName"]
if TRANSFER_DIR in original_name:
uuid = None
checksum = None
checksum_type = None
uuid_loc = object_dict[0]["ns3:objectIdentifier_dict_list"]
try:
if uuid_loc[0]["ns3:objectIdentifierType"] == "UUID":
uuid = uuid_loc[0]["ns3:objectIdentifierValue"]
except KeyError:
logger.info("UUID identifiers not found.")
fixity_loc = object_dict[0][
"ns3:objectCharacteristics_dict_list"
]
try:
checksum_type = fixity_loc[0]["ns3:fixity_dict_list"][0][
"ns3:messageDigestAlgorithm"
]
checksum = fixity_loc[0]["ns3:fixity_dict_list"][0][
"ns3:messageDigest"
]
if checksum not in checksum_list:
checksum_list[checksum] = None
else:
# Add the aip to out running duplicate list.
aip = Aip(
aip=aip_uuid,
original_name=original_name,
uuid=uuid,
checksum=checksum,
checksum_type=checksum_type,
)
duplicate_list[aip] = checksum
except KeyError:
logger.info("Fixity dict_list not found.")
def count_results():
"""Count the number of results we're expecting to return form ES."""
results = es.search(
index="aips", doc_type="aip", body=fields, from_=0, size=0
)
total = results["hits"]["total"]
logger.info("Returning %s results", total)
return total
def get_mets(page=0):
"""Connect to Elasticsearch per number of pages we require and retrieve
results.
"""
logger.info("Connecting to ES for results, page %s", page)
# Connect to the ES client and perform a search.
return (
es.search(
index="aips",
doc_type="aip",
body=fields,
from_=page,
size=PAGING_SIZE,
),
(page + PAGING_SIZE),
)
def main():
"""Primary entry point for this script."""
# Keep track of the amount of data we need to process to get the
# information we need.
total_data = 0
# Keep track of all the AIP data
duplicate_list = {}
checksum_list = {}
expected_results = count_results()
results, page = get_mets()
while results["hits"]["hits"]:
total_data = total_data + data_processed(results)
for hit in results["hits"]["hits"]:
try:
aip_uuid = hit["_source"]["uuid"]
mets_dict = hit["_source"]["mets"]["ns0:mets_dict_list"]
get_hash_data(
mets_dict, aip_uuid, checksum_list, duplicate_list
)
except KeyError:
pass
if page >= expected_results:
break
results, page = get_mets(page)
logger.info("%s bytes processed", total_data)
sorted_dict = sorted(duplicate_list.items(), key=operator.itemgetter(1))
for dupe in sorted_dict:
print(dupe)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment