Created
February 8, 2011 20:02
-
-
Save karussell/817094 to your computer and use it in GitHub Desktop.
Something goes wrong when doing bulkUpdate and retrieving
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.elasticsearch.search.SearchHit; | |
import org.elasticsearch.search.SearchHits; | |
import java.util.Collection; | |
import org.elasticsearch.action.search.SearchResponse; | |
import org.elasticsearch.action.search.SearchType; | |
import org.elasticsearch.common.unit.TimeValue; | |
import java.util.LinkedHashMap; | |
import java.util.Map.Entry; | |
import java.util.Arrays; | |
import java.util.Map; | |
import org.elasticsearch.action.bulk.BulkResponse; | |
import org.elasticsearch.client.Requests; | |
import org.elasticsearch.client.action.bulk.BulkRequestBuilder; | |
import org.elasticsearch.action.count.CountResponse; | |
import java.io.IOException; | |
import org.elasticsearch.action.WriteConsistencyLevel; | |
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; | |
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; | |
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; | |
import org.elasticsearch.client.Client; | |
import org.elasticsearch.client.action.index.IndexRequestBuilder; | |
import org.elasticsearch.common.settings.ImmutableSettings; | |
import org.elasticsearch.common.xcontent.XContentBuilder; | |
import org.elasticsearch.common.xcontent.XContentFactory; | |
import org.elasticsearch.index.query.xcontent.QueryBuilders; | |
import org.elasticsearch.indices.IndexAlreadyExistsException; | |
import org.elasticsearch.node.Node; | |
import static org.elasticsearch.node.NodeBuilder.*; | |
/** | |
* | |
* @author Peter Karich, peat_hal 'at' users 'dot' sourceforge 'dot' net | |
*/ | |
public class TestES { | |
public static void main(String[] args) throws IOException { | |
new TestES().start(); | |
} | |
private Logger logger = LoggerFactory.getLogger(getClass()); | |
private Client client; | |
private String indexType = "tweet"; | |
private String NUM = "num"; | |
public void start() throws IOException { | |
Node node = nodeBuilder(). | |
local(true). | |
settings(ImmutableSettings.settingsBuilder(). | |
put("index.number_of_shards", 2). | |
put("index.number_of_replicas", 0). | |
put("gateway.type", "none"). | |
build()).local(true). | |
build(). | |
start(); | |
client = node.client(); | |
// create indices | |
String index1 = "index1"; | |
String resindex = "resindex"; | |
createIndex(index1); | |
createIndex(resindex); | |
waitForYellow(resindex); | |
boolean showStrangeBehaviour = true; | |
boolean test1 = true; | |
if(showStrangeBehaviour) { | |
if(test1) | |
// will result in "collected:185" BUT should be 200 | |
feedDoc(index1, createDoc(2), "0"); | |
else | |
// collected:169 BUT should be 200 | |
feedDoc(index1, createDoc(2), "199"); | |
} else { | |
if(test1) | |
// collected:201 is okay | |
feedDoc(index1, createDoc(2), "-1"); | |
else | |
// collected:201 is okay | |
feedDoc(index1, createDoc(2), "200"); | |
} | |
// create some equal content from 0..199 | |
Map<String, XContentBuilder> map = new LinkedHashMap<String, XContentBuilder>(); | |
for (int i = 0; i < 200; i++) { | |
map.put("" + i, createDoc(i)); | |
} | |
bulkUpdate(map, index1); | |
System.out.println("index1:" + countAll(index1) + " resindex:" + countAll(resindex)); | |
mergeIndices(Arrays.asList(index1), resindex, 2); | |
logger.info("201? " + countAll(resindex)); | |
node.stop(); | |
} | |
public void createIndex(String indexName) { | |
try { | |
client.admin().indices().create(new CreateIndexRequest(indexName)).actionGet(); | |
} catch (IndexAlreadyExistsException ex) { | |
logger.info(indexName + " already exists!", ex); | |
} | |
} | |
public void waitForYellow(String indexName) { | |
client.admin().cluster().health(new ClusterHealthRequest(indexName).waitForYellowStatus()).actionGet(); | |
} | |
public void refresh(String indexName) { | |
client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet(); | |
} | |
public void refresh(String... indices) { | |
client.admin().indices().refresh(new RefreshRequest(indices)).actionGet(); | |
} | |
private void feed(String indexName, String type, String id, XContentBuilder docBuilder) { | |
IndexRequestBuilder irb = client.prepareIndex(indexName, type, id). | |
setConsistencyLevel(WriteConsistencyLevel.DEFAULT). | |
setSource(docBuilder); | |
irb.execute().actionGet(); | |
refresh(indexName); | |
} | |
public long countAll(String indexName) { | |
CountResponse response = client.prepareCount(indexName). | |
setQuery(QueryBuilders.matchAllQuery()). | |
execute().actionGet(); | |
return response.getCount(); | |
} | |
public XContentBuilder createDoc(int num) throws IOException { | |
XContentBuilder docBuilder = XContentFactory.jsonBuilder().startObject(); | |
docBuilder.field(NUM, num); | |
docBuilder.endObject(); | |
return docBuilder; | |
} | |
public void bulkUpdate(Map<String, XContentBuilder> docs, String indexName) { | |
// now using bulk API instead of feeding each doc separate with feedDoc | |
BulkRequestBuilder brb = client.prepareBulk(); | |
for (Entry<String, XContentBuilder> e : docs.entrySet()) { | |
brb.add(Requests.indexRequest(indexName).type(indexType).id(e.getKey()).source(e.getValue())); | |
brb.setRefresh(true); | |
} | |
if (brb.numberOfActions() > 0) { | |
// System.out.println("actions:" + brb.numberOfActions()); | |
BulkResponse rsp = brb.execute().actionGet(); | |
System.out.println(rsp.items().length); | |
if (rsp.hasFailures()) | |
System.out.println("Error while bulkUpdate:" + rsp.buildFailureMessage()); | |
} | |
} | |
public void mergeIndices(Collection<String> indexList, String intoIndex, int hitsPerPage) { | |
refresh(indexList.toArray(new String[0])); | |
refresh(intoIndex); | |
// simple merge works without client.prepareSearch() instead client.prepareSearch(fromIndex) | |
// but delete + alias fails because of grabbing tweets from index2 too! | |
// | |
// looping for all indices in indexList as well as the following fails | |
// client.prepareSearch(Helper.toStringArray(indexList)) | |
for (String fromIndex : indexList) { | |
SearchResponse rsp = client.prepareSearch(fromIndex). | |
setQuery(QueryBuilders.matchAllQuery()).setSize(hitsPerPage). | |
// important | |
setSearchType(SearchType.QUERY_AND_FETCH). | |
setScroll(TimeValue.timeValueMinutes(30)).execute().actionGet(); | |
try { | |
long total = rsp.hits().totalHits(); | |
int collectedResults = 0; | |
int currentResults; | |
while ((currentResults = rsp.hits().hits().length) > 0) { | |
Map<String, XContentBuilder> docs = collectDocs(rsp); | |
bulkUpdate(docs, intoIndex); | |
collectedResults += currentResults; | |
rsp = client.prepareSearchScroll(rsp.scrollId()). | |
setScroll(TimeValue.timeValueMinutes(30)).execute().actionGet(); | |
logger.info("Progress " + collectedResults + "/" + total + " fromIndex=" + fromIndex); | |
} | |
logger.info("Finished copying of index " + fromIndex + ". Total:" + total + " collected:" + collectedResults); | |
} catch (Exception ex) { | |
logger.error("Failed to copy data from index " + fromIndex + " into " + intoIndex + ".", ex); | |
} | |
} | |
refresh(intoIndex); | |
} | |
public Map<String, XContentBuilder> collectDocs(SearchResponse rsp) throws IOException { | |
SearchHits docs = rsp.getHits(); | |
Map<String, XContentBuilder> list = new LinkedHashMap<String, XContentBuilder>(); | |
for (SearchHit sd : docs) { | |
int num = (Integer) sd.getSource().get(NUM); | |
list.put(sd.getId(), createDoc(num)); | |
} | |
return list; | |
} | |
public void feedDoc(String indexName, XContentBuilder b, String id) { | |
IndexRequestBuilder irb = client.prepareIndex(indexName, indexType, id). | |
setConsistencyLevel(WriteConsistencyLevel.DEFAULT).setRefresh(true). | |
setSource(b); | |
irb.execute().actionGet(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment