Skip to content

Instantly share code, notes, and snippets.

@thuhak
Created April 26, 2020 06:06
Show Gist options
  • Save thuhak/2c35190ac26891c5cd204a64366812b0 to your computer and use it in GitHub Desktop.
Save thuhak/2c35190ac26891c5cd204a64366812b0 to your computer and use it in GitHub Desktop.
remove duplicated records in elasticsearch
#!/usr/bin/env python3.6
# A description and analysis of this code can be found at
# https://alexmarquardt.com/2018/07/23/deduplicating-documents-in-elasticsearch/
from argparse import ArgumentParser
import hashlib
from elasticsearch import Elasticsearch
parser = ArgumentParser()
parser.add_argument('url', help='endpoint of elasticsearch')
parser.add_argument('index', help='index of elasticsearch')
parser.add_argument('-c', '--columes', required=True, metavar='COLUME', nargs='+', help='used to determine if a document is a duplicate')
parser.add_argument('-t', '--doc_type', default='log', help='doc type')
args = parser.parse_args()
es = Elasticsearch(args.url)
dict_of_duplicate_docs = {}
# The following line defines the fields that will be
# used to determine if a document is a duplicate
keys_to_include_in_hash = args.columes
# Process documents returned by the current search/scroll
def populate_dict_of_duplicate_docs(hits):
for item in hits:
combined_key = ""
for mykey in keys_to_include_in_hash:
try:
combined_key += str(item['_source'][mykey])
except:
continue
_id = item["_id"]
hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
# If the hashval is new, then we will create a new key
# in the dict_of_duplicate_docs, which will be
# assigned a value of an empty array.
# We then immediately push the _id onto the array.
# If hashval already exists, then
# we will just push the new _id onto the existing array
dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# Loop over all documents in the index, and populate the
# dict_of_duplicate_docs data structure.
def scroll_over_all_docs():
data = es.search(index=args.index, scroll='1m', body={"query": {"match_all": {}}})
# Get the scroll ID
sid = data['_scroll_id']
scroll_size = len(data['hits']['hits'])
# Before scroll, process current batch of hits
populate_dict_of_duplicate_docs(data['hits']['hits'])
while scroll_size > 0:
data = es.scroll(scroll_id=sid, scroll='2m')
# Process current batch of hits
populate_dict_of_duplicate_docs(data['hits']['hits'])
# Update the scroll ID
sid = data['_scroll_id']
# Get the number of results that returned in the last scroll
scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
# Search through the hash of doc values to see if any
# duplicate hashes have been found
for hashval, array_of_ids in dict_of_duplicate_docs.items():
if len(array_of_ids) > 1:
print("********** Duplicate docs hash=%s **********" % hashval)
# Get the documents that have mapped to the current hasval
matching_docs = es.mget(index=args.index, doc_type=args.doc_type, body={"ids": array_of_ids})
for doc in matching_docs['docs']:
# In order to remove the possibility of hash collisions,
# write code here to check all fields in the docs to
# see if they are truly identical - if so, then execute a
# DELETE operation on all except one.
# In this example, we just print the docs.
es.delete(index=args.index, doc_type=args.doc_type, id=doc['_id'])
# print("doc=%s\n" % doc)
def main():
scroll_over_all_docs()
loop_over_hashes_and_remove_duplicates()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment