-
-
Save WooodHead/ea0ead374b62a78e9b8458f287a456de to your computer and use it in GitHub Desktop.
remove duplicated records in elasticsearch
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3.6 | |
# A description and analysis of this code can be found at | |
# https://alexmarquardt.com/2018/07/23/deduplicating-documents-in-elasticsearch/ | |
from argparse import ArgumentParser | |
import hashlib | |
from elasticsearch import Elasticsearch | |
parser = ArgumentParser() | |
parser.add_argument('url', help='endpoint of elasticsearch') | |
parser.add_argument('index', help='index of elasticsearch') | |
parser.add_argument('-c', '--columes', required=True, metavar='COLUME', nargs='+', help='used to determine if a document is a duplicate') | |
parser.add_argument('-t', '--doc_type', default='log', help='doc type') | |
args = parser.parse_args() | |
es = Elasticsearch(args.url) | |
dict_of_duplicate_docs = {} | |
# The following line defines the fields that will be | |
# used to determine if a document is a duplicate | |
keys_to_include_in_hash = args.columes | |
# Process documents returned by the current search/scroll | |
def populate_dict_of_duplicate_docs(hits): | |
for item in hits: | |
combined_key = "" | |
for mykey in keys_to_include_in_hash: | |
try: | |
combined_key += str(item['_source'][mykey]) | |
except: | |
continue | |
_id = item["_id"] | |
hashval = hashlib.md5(combined_key.encode('utf-8')).digest() | |
# If the hashval is new, then we will create a new key | |
# in the dict_of_duplicate_docs, which will be | |
# assigned a value of an empty array. | |
# We then immediately push the _id onto the array. | |
# If hashval already exists, then | |
# we will just push the new _id onto the existing array | |
dict_of_duplicate_docs.setdefault(hashval, []).append(_id) | |
# Loop over all documents in the index, and populate the | |
# dict_of_duplicate_docs data structure. | |
def scroll_over_all_docs(): | |
data = es.search(index=args.index, scroll='1m', body={"query": {"match_all": {}}}) | |
# Get the scroll ID | |
sid = data['_scroll_id'] | |
scroll_size = len(data['hits']['hits']) | |
# Before scroll, process current batch of hits | |
populate_dict_of_duplicate_docs(data['hits']['hits']) | |
while scroll_size > 0: | |
data = es.scroll(scroll_id=sid, scroll='2m') | |
# Process current batch of hits | |
populate_dict_of_duplicate_docs(data['hits']['hits']) | |
# Update the scroll ID | |
sid = data['_scroll_id'] | |
# Get the number of results that returned in the last scroll | |
scroll_size = len(data['hits']['hits']) | |
def loop_over_hashes_and_remove_duplicates(): | |
# Search through the hash of doc values to see if any | |
# duplicate hashes have been found | |
for hashval, array_of_ids in dict_of_duplicate_docs.items(): | |
if len(array_of_ids) > 1: | |
print("********** Duplicate docs hash=%s **********" % hashval) | |
# Get the documents that have mapped to the current hasval | |
matching_docs = es.mget(index=args.index, doc_type=args.doc_type, body={"ids": array_of_ids}) | |
for doc in matching_docs['docs']: | |
# In order to remove the possibility of hash collisions, | |
# write code here to check all fields in the docs to | |
# see if they are truly identical - if so, then execute a | |
# DELETE operation on all except one. | |
# In this example, we just print the docs. | |
es.delete(index=args.index, doc_type=args.doc_type, id=doc['_id']) | |
# print("doc=%s\n" % doc) | |
def main(): | |
scroll_over_all_docs() | |
loop_over_hashes_and_remove_duplicates() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment