Skip to content

Instantly share code, notes, and snippets.

@mitallast
Created August 1, 2016 14:52
Show Gist options
  • Select an option

  • Save mitallast/bf66aa5f0a3513c08398b53ae61e53b4 to your computer and use it in GitHub Desktop.

Select an option

Save mitallast/bf66aa5f0a3513c08398b53ae61e53b4 to your computer and use it in GitHub Desktop.
Reindex elasticsearch index using parallel bulk api
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk, scan
import json
import time
import codecs
import logging
import sys
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.info("start reindex")
es_tracer = logging.getLogger('elasticsearch.trace')
es_tracer.setLevel(logging.INFO)
es_tracer.propagate = False
es_logger = logging.getLogger('elasticsearch')
es_logger.setLevel(logging.INFO)
es_logger.propagate = True
client = Elasticsearch([
{"host": "localhost"}
], retry_on_timeout=True, max_retries=10, timeout=1000)
def parallel_reindex(client, source_index, target_index, query=None, target_client=None, chunk_size=500, scroll='5m', scan_kwargs={}, bulk_kwargs={}):
target_client = client if target_client is None else target_client
docs = scan(client,
query=query,
index=source_index,
scroll=scroll,
**scan_kwargs
)
def _change_doc_index(hits, index):
for h in hits:
h['_index'] = index
if 'fields' in h:
h.update(h.pop('fields'))
yield h
kwargs = {
'stats_only': True,
}
kwargs.update(bulk_kwargs)
for response in parallel_bulk(target_client, _change_doc_index(docs, target_index), thread_count=8, chunk_size=chunk_size):
print 'response: ', response
query={"query" : {"match_all" : {}}}
parallel_reindex(client, source_index='source-index', target_index='target-index', query=query, chunk_size=500, scroll='10m')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment