Created
September 2, 2015 13:52
-
-
Save henrikno/e0ebd6804cb62491343c to your computer and use it in GitHub Desktop.
Elasticsearch bulk index with retry and failure check
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.net.InetSocketAddress; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.Consumer; | |
import org.elasticsearch.action.WriteConsistencyLevel; | |
import org.elasticsearch.action.bulk.BulkItemResponse; | |
import org.elasticsearch.action.bulk.BulkRequestBuilder; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.action.get.GetResponse; | |
import org.elasticsearch.client.transport.TransportClient; | |
import org.elasticsearch.common.settings.Settings; | |
import org.elasticsearch.common.transport.InetSocketTransportAddress; | |
public class ElasticSearchTest { | |
public static final String INDEX_NAME = "dummy_index"; | |
private TransportClient client; | |
public static void main(String[] args) throws Exception { | |
new ElasticSearchTest().run(); | |
} | |
public void run() throws InterruptedException { | |
setup(); | |
if (!client.admin().indices().prepareExists(INDEX_NAME).get().isExists()) { | |
client.admin().indices().prepareCreate(INDEX_NAME).get(); | |
} | |
System.out.println("Waiting..."); | |
client.admin().cluster().prepareHealth().setWaitForGreenStatus().get(); | |
System.out.println("Starting..."); | |
int numberOfRequests = 200; | |
long startTime = System.nanoTime(); | |
List<String> ids = new ArrayList<>(); | |
for (int j = 0; j < numberOfRequests; j++) { | |
retry(j, (x) -> { | |
BulkRequestBuilder bulk = client.prepareBulk(); | |
int bulkSize = 1000; | |
for (int i = 0; i < bulkSize; i++) { | |
String id = String.valueOf(bulkSize * x + i); | |
bulk.add(client.prepareIndex(INDEX_NAME, "dummy", id) | |
.setConsistencyLevel(WriteConsistencyLevel.ALL) | |
.setSource(DOCUMENT.replace("<<documentId>>", id))); | |
} | |
System.out.println("Bulk " + x + " started"); | |
BulkResponse bulkResponse = bulk.get(); | |
if (bulkResponse.hasFailures()) { | |
throw new RuntimeException(bulkResponse.buildFailureMessage()); | |
} | |
for (BulkItemResponse baa : bulkResponse) { | |
if (baa.getResponse().getShardInfo().getFailed() > 0) { | |
throw new RuntimeException("Shard level failure"); | |
} | |
} | |
// FlushResponse flush = client.admin().indices().prepareFlush(INDEX_NAME).setWaitIfOngoing(true).get(); | |
// if (flush.getFailedShards() > 0) { | |
// throw new RuntimeException("Flush failed"); | |
// } | |
// client.admin().indices().prepareRefresh("dummy_index").get(); | |
for (BulkItemResponse item : bulkResponse) { | |
if (!item.isFailed()) { | |
ids.add(item.getId()); | |
} | |
} | |
System.out.println("Bulk " + x + " finished"); | |
}); | |
} | |
client.admin().cluster().prepareHealth().setWaitForGreenStatus().get(); | |
long endTime = System.nanoTime(); | |
long msUsed = TimeUnit.NANOSECONDS.toMillis(endTime - startTime); | |
System.out.println("ms used: " + msUsed); | |
System.out.println("Documents: " + ids.size()); | |
System.out.println("DPS: " + (1000000000.0 * ids.size() / (endTime - startTime))); | |
client.admin().indices().prepareRefresh(INDEX_NAME).get(); | |
long documentCount = client.prepareCount(INDEX_NAME).get().getCount(); | |
System.out.println("ES document _count = " + documentCount); | |
System.out.println("Checking documents"); | |
long notFound = 0; | |
for (String idVal : ids) { | |
GetResponse getFields = client.prepareGet(INDEX_NAME, "dummy", idVal).get(); | |
if (!getFields.isExists()) { | |
notFound++; | |
} | |
} | |
System.out.println("Documents Lost: " + notFound); | |
client.admin().indices().prepareRefresh(INDEX_NAME).get(); | |
System.out.println("ES document _count = " + client.prepareCount(INDEX_NAME).get().getCount()); | |
} | |
private void setup() { | |
Settings settings = Settings.settingsBuilder() | |
.put("node.client", true) | |
.put("client.transport.ignore_cluster_name", true) | |
.put("path.home", "/tmp/es_tmp") | |
.build(); | |
client = TransportClient.builder().settings(settings).build(); | |
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("localhost", 9300))); | |
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("localhost", 9301))); | |
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("localhost", 9302))); | |
} | |
private static void retry(int x, Consumer<Integer> runnable) { | |
while (true) { | |
try { | |
runnable.accept(x); | |
break; | |
} catch (Exception e) { | |
e.printStackTrace(); | |
System.out.println("Retrying bulk ..." + x); | |
} | |
} | |
} | |
private static final String DOCUMENT = "{\"data\": \"Insert some interesting data here\"}"; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment