Created
February 9, 2011 20:41
-
-
Save karussell/819239 to your computer and use it in GitHub Desktop.
merge indices
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// this GIST is used for this ticket: https://gist.github.com/819239 | |
// SOLVED when sort against id ! | |
package org.elasticsearch.test.integration.search.scroll; | |
import org.elasticsearch.action.WriteConsistencyLevel; | |
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.action.count.CountResponse; | |
import org.elasticsearch.action.search.SearchResponse; | |
import org.elasticsearch.action.search.SearchType; | |
import org.elasticsearch.client.Client; | |
import org.elasticsearch.client.Requests; | |
import org.elasticsearch.client.action.bulk.BulkRequestBuilder; | |
import org.elasticsearch.client.action.index.IndexRequestBuilder; | |
import org.elasticsearch.common.settings.ImmutableSettings; | |
import org.elasticsearch.common.settings.Settings; | |
import org.elasticsearch.common.unit.TimeValue; | |
import org.elasticsearch.common.xcontent.XContentBuilder; | |
import org.elasticsearch.common.xcontent.json.JsonXContent; | |
import org.elasticsearch.index.query.xcontent.QueryBuilders; | |
import org.elasticsearch.search.SearchHit; | |
import org.elasticsearch.search.SearchHits; | |
import org.elasticsearch.search.sort.SortOrder; | |
import org.elasticsearch.test.integration.AbstractNodesTests; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.testng.annotations.AfterClass; | |
import org.testng.annotations.BeforeClass; | |
import org.testng.annotations.Test; | |
import java.io.IOException; | |
import java.util.Arrays; | |
import java.util.Collection; | |
import java.util.LinkedHashMap; | |
import java.util.Map; | |
import static org.elasticsearch.common.xcontent.XContentFactory.*; | |
import static org.elasticsearch.index.query.xcontent.QueryBuilders.*; | |
import static org.hamcrest.MatcherAssert.*; | |
import static org.hamcrest.Matchers.*; | |
/** | |
* @author kimchy (shay.banon) | |
*/ | |
public class SearchScrollTests extends AbstractNodesTests { | |
private Client client; | |
@BeforeClass public void createNodes() throws Exception { | |
startNode("node1"); | |
startNode("node2"); | |
client = getClient(); | |
} | |
@AfterClass public void closeNodes() { | |
client.close(); | |
closeAllNodes(); | |
} | |
protected Client getClient() { | |
return client("node1"); | |
} | |
private Logger logger = LoggerFactory.getLogger(getClass()); | |
private String indexType = "tweet"; | |
private String NUM = "num"; | |
@Test public void testScrollWithPreUpdate() throws Exception { | |
String index1 = "test"; | |
String resindex = "resindex"; | |
try { | |
client.admin().indices().prepareDelete(index1).execute().actionGet(); | |
client.admin().indices().prepareDelete(resindex).execute().actionGet(); | |
} catch (Exception e) { | |
// ignore | |
} | |
// create indices | |
createIndex(index1); | |
createIndex(resindex); | |
waitForGreen(); | |
waitForGreen(); | |
boolean showStrangeBehaviour = true; | |
boolean test1 = true; | |
if (showStrangeBehaviour) { | |
if (test1) | |
// will result in "collected:185" BUT should be 200 | |
feedDoc(index1, createDoc(2), "0"); | |
else | |
// collected:169 BUT should be 200 | |
feedDoc(index1, createDoc(2), "199"); | |
} else { | |
if (test1) | |
// collected:201 is okay | |
feedDoc(index1, createDoc(2), "-1"); | |
else | |
// collected:201 is okay | |
feedDoc(index1, createDoc(2), "200"); | |
} | |
// Thread.sleep(10000); | |
// create some equal content from 0..199 | |
Map<String, XContentBuilder> map = new LinkedHashMap<String, XContentBuilder>(); | |
for (int i = 0; i < 200; i++) { | |
map.put("" + i, createDoc(i)); | |
} | |
bulkUpdate(map, index1); | |
// Thread.sleep(10000); | |
System.out.println("index1:" + countAll(index1) + " resindex:" + countAll(resindex)); | |
mergeIndices(Arrays.asList(index1), resindex, 2); | |
logger.info("200? " + countAll(resindex)); | |
assertThat(countAll(resindex), equalTo(200l)); | |
//node.stop(); | |
} | |
public void createIndex(String indexName) { | |
try { | |
client.admin().indices().prepareCreate(indexName).setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 2)).execute().actionGet(); | |
//client.admin().indices().create(new CreateIndexRequest(indexName)).actionGet(); | |
} catch (Exception ex) { | |
logger.info(indexName + " already exists!", ex); | |
} | |
} | |
public void waitForGreen() { | |
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); | |
//client.admin().cluster().health(new ClusterHealthRequest(indexName).waitForYellowStatus()).actionGet(); | |
} | |
public void refresh(String indexName) { | |
client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet(); | |
} | |
public void refresh(String... indices) { | |
client.admin().indices().refresh(new RefreshRequest(indices)).actionGet(); | |
} | |
public long countAll(String indexName) { | |
CountResponse response = client.prepareCount(indexName). | |
setQuery(QueryBuilders.matchAllQuery()). | |
execute().actionGet(); | |
return response.getCount(); | |
} | |
public XContentBuilder createDoc(int num) throws IOException { | |
XContentBuilder docBuilder = JsonXContent.unCachedContentBuilder().startObject(); | |
docBuilder.field(NUM, num); | |
docBuilder.endObject(); | |
return docBuilder; | |
} | |
public void bulkUpdate(Map<String, XContentBuilder> docs, String indexName) { | |
// now using bulk API instead of feeding each doc separate with feedDoc | |
BulkRequestBuilder brb = client.prepareBulk(); | |
for (Map.Entry<String, XContentBuilder> e : docs.entrySet()) { | |
brb.add(Requests.indexRequest(indexName).type(indexType).id(e.getKey()).source(e.getValue())); | |
brb.setRefresh(true); | |
} | |
if (brb.numberOfActions() > 0) { | |
BulkResponse rsp = brb.execute().actionGet(); | |
//System.out.println(rsp.items().length); | |
if (rsp.hasFailures()) | |
System.out.println("Error while bulkUpdate:" + rsp.buildFailureMessage()); | |
} | |
} | |
public void mergeIndices(Collection<String> indexList, String intoIndex, int hitsPerPage) { | |
refresh(indexList.toArray(new String[0])); | |
refresh(intoIndex); | |
for (String fromIndex : indexList) { | |
//String fromIndex = ""; | |
SearchResponse rsp = client.prepareSearch(fromIndex). | |
setQuery(QueryBuilders.matchAllQuery()).setSize(hitsPerPage). | |
// THEN_FETCH is even worse | |
setSearchType(SearchType.QUERY_AND_FETCH). | |
setScroll(TimeValue.timeValueMinutes(30)).execute().actionGet(); | |
try { | |
long total = rsp.hits().totalHits(); | |
int collectedResults = 0; | |
do { | |
Map<String, XContentBuilder> docs = collectDocs(rsp); | |
bulkUpdate(docs, intoIndex); | |
collectedResults += rsp.hits().hits().length;; | |
rsp = client.prepareSearchScroll(rsp.scrollId()). | |
setScroll(TimeValue.timeValueMinutes(30)).execute().actionGet(); | |
//logger.info("Progress " + collectedResults + "/" + total + " fromIndex=" + fromIndex); | |
} while (rsp.hits().hits().length > 0); | |
//logger.info("Finished copying of index " + fromIndex + ". Total:" + total + " collected:" + collectedResults); | |
} catch (Exception ex) { | |
logger.error("Failed to copy data from index " + fromIndex + " into " + intoIndex + ".", ex); | |
} | |
} | |
refresh(intoIndex); | |
} | |
public Map<String, XContentBuilder> collectDocs(SearchResponse rsp) throws IOException { | |
SearchHits docs = rsp.getHits(); | |
Map<String, XContentBuilder> list = new LinkedHashMap<String, XContentBuilder>(); | |
for (SearchHit sd : docs) { | |
int num = (Integer) sd.getSource().get(NUM); | |
list.put(sd.getId(), createDoc(num)); | |
} | |
return list; | |
} | |
public void feedDoc(String indexName, XContentBuilder b, String id) { | |
IndexRequestBuilder irb = client.prepareIndex(indexName, indexType, id). | |
setConsistencyLevel(WriteConsistencyLevel.DEFAULT).setRefresh(true). | |
setSource(b); | |
irb.execute().actionGet(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment