-
-
Save slachiewicz/5a5308c4df12d4d287db to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Reindexer extends BaseElasticsearchUtility { | |
private static Log logger = LogFactory.getLog(Reindexer.class); | |
public static void main(String[] args) { | |
if (args.length != 5) { | |
doUsage(); | |
System.exit(-1); | |
} | |
Reindexer reindexer = new Reindexer(); | |
TransportClient writeClient = reindexer.getClient(args[2], args[0], Integer.parseInt(args[1])); | |
List<Client> writeClients = new ArrayList<Client>(); | |
writeClients.add(writeClient); | |
reindexer.setWriteClients(writeClients); | |
reindexer.reindex(args[3], args[4]); | |
} | |
public static void doUsage() { | |
System.out.println("USAGE:"); | |
System.out.println(" java Reindexer [host] [port] [cluster] [current index] [new index]"); | |
} | |
/** | |
* Copy the current index to another index on the same cluster. The method | |
* will attempt to create the target index. If it already exists, it will | |
* be logged and the method will continue. | |
* | |
* @param currentIndex The current index. | |
* @param targetIndex The target index. | |
*/ | |
public void reindex(String currentIndex, String targetIndex) { | |
logger.info("Reindexing from " + currentIndex + " to " + targetIndex); | |
try { | |
IndicesAdminClient admin = getTargetClient().admin().indices(); | |
admin.prepareCreate(targetIndex).execute().actionGet(); | |
} catch (ElasticsearchException ee) { | |
logger.warn("Tried to create target index, got: " + ee.getMessage()); | |
} | |
SearchResponse scrollResp = getTargetClient().prepareSearch(currentIndex) | |
.setSearchType(SearchType.SCAN) | |
.setScroll(new TimeValue(60000)) | |
.setSize(100).execute().actionGet(); //100 hits per shard will be returned for each scroll | |
//Scroll until no hits are returned | |
while (true) { | |
// the first time through there may be zero hits (this is just how scrolls work) | |
if (scrollResp.getHits().getHits().length > 0) { | |
BulkRequestBuilder bulkRequest = getTargetClient().prepareBulk(); | |
for (SearchHit hit : scrollResp.getHits()) { | |
bulkRequest.add(getTargetClient().prepareIndex(targetIndex, hit.getType()) | |
.setSource(getJson(hit)) | |
); | |
} | |
BulkResponse bulkResponse = bulkRequest.execute().actionGet(); | |
if (bulkResponse.hasFailures()) { | |
logger.error("There were failures:"); | |
for (BulkItemResponse response : bulkResponse.getItems()) { | |
logger.error(response.getFailureMessage()); | |
} | |
} else { | |
logger.info("Batch completed successfully"); | |
} | |
} | |
scrollResp = getTargetClient().prepareSearchScroll(scrollResp.getScrollId()) | |
.setScroll(new TimeValue(600000)) | |
.execute() | |
.actionGet(); | |
//Break condition: No hits are returned | |
if (scrollResp.getHits().getHits().length == 0) { | |
break; | |
} | |
} | |
logger.info("Done reindexing!"); | |
} | |
/** | |
* Override this method to transform the object before it is reindexed. | |
* @param hit | |
* @return JSON representing the transformed document. | |
*/ | |
public String getJson(SearchHit hit) { | |
return hit.getSourceAsString(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment