Skip to content

Instantly share code, notes, and snippets.

@dforste
Last active April 11, 2020 14:10
Show Gist options
  • Select an option

  • Save dforste/db72b1ccd81556f580a0 to your computer and use it in GitHub Desktop.

Select an option

Save dforste/db72b1ccd81556f580a0 to your computer and use it in GitHub Desktop.
Python replacement for elasticsearch delete by query.
#!/bin/python
import elasticsearch
import sys, getopt
def main(argv):
number = 10
search = 'metrics.changes.total:0 AND type:puppet-report'
try:
opts, args = getopt.getopt(argv,"hs:n:",["search=","number="])
except getopt.GetoptError:
print 'delete_from_elasticsearch.py -s <search_expression> -n <number_per_shard>'
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print 'delete_from_elasticsearch.py -s <search_expression> -n <number_per_shard>'
sys.exit()
elif opt in ("-s", "--search"):
search = arg
elif opt in ("-n", "--number"):
number = arg
print 'I will search for "', search
print 'I will delete these in batches of "', number
delete_docs(search, number)
def delete_docs(search, number=10):
# Setup elasticsearch connection.
es = elasticsearch.Elasticsearch(
['localhost'],
# sniff before doing anything
sniff_on_start=True,
# refresh nodes after a node fails to respond
sniff_on_connection_fail=True,
# and also every 60 seconds
sniffer_timeout=60
)
# Start the initial search.
hits=es.search(
q=search,
index="*logstash-*",
fields="_id",
size=number,
search_type="scan",
scroll='5m',
)
# Now remove the results.
while True:
try:
# Git the next page of results.
scroll=es.scroll( scroll_id=hits['_scroll_id'], scroll='5m', )
except elasticsearch.exceptions.NotFoundError:
break
# We have results initialize the bulk variable.
bulk = ""
# Remove the variables.
for result in scroll['hits']['hits']:
bulk = bulk + '{ "delete" : { "_index" : "' + str(result['_index']) + '", "_type" : "' + str(result['_type']) + '", "_id" : "' + str(result['_id']) + '" } }\n'
# print "Items left " + str(scroll['hits']['total']) + ' deleting ' + str(bulk.count('delete')) + ' items.'
# print bulk
es.bulk( body=bulk )
if __name__ == "__main__":
main(sys.argv[1:])
@jmwilli25
Copy link

jmwilli25 commented Feb 23, 2017

This is exactly what I was looking for. Thank you! The only issue that I have seen is that after the first search you scroll passed the first set of results then you immediately scroll and delete the docs returned by the scroll. The first set of docs returns by the scroll are never deleted.

Ex.

  • 20 docs in ES
  • Perform the search and limit to 10. This returns the first set of 10 docs.
  • Scroll to next 10 and delete them
  • This doesn't delete the first 10 docs returned by the search and never will because the delete doesn't occur until after the first scroll which returns the second "page" of results.

Just put this after the search:

bulk = ""
for result in hits['hits']['hits']:
    bulk = bulk + '{"delete": { "_index": "' + str(result['_index']) + '", "_type": "' + str(result['_type']) + '", "_id": "' + str(result['_id']) + '"} }\n'

es.bulk( body=bulk )

@MatanTubul
Copy link

Hi,

can you provide me some example on how should i run the script?

Thanks

@AhmyOhlin
Copy link

Hello,
I used this programm before 2 week on ES 2.3 and It was working fine. Now i upgraded ES 2.3 to ES 5.2 and i got this Error.
elasticsearch.exceptions.RequestError: TransportError(400, 'illegal_argument_exception', 'No search type for [scan]')
I think that the ES 5.2 doesn't support "scan" option for search_type.
when i comment this line # search_type="scan", then get another Error:
packages\elasticsearch\client_init_.py", line 1094, in bulk raise ValueError("Empty value passed for a required argument 'body'.")
ValueError: Empty value passed for a required argument 'body'.
anyone can help me to fix this problem
Thanks

Copy link

ghost commented May 13, 2017

Hello,

I need to run delete with Routing Number, i am new to Es and Python?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment