Skip to content

Instantly share code, notes, and snippets.

@kennycason
Created December 4, 2013 05:27
Show Gist options
  • Select an option

  • Save kennycason/7782807 to your computer and use it in GitHub Desktop.

Select an option

Save kennycason/7782807 to your computer and use it in GitHub Desktop.
Elastic Search Comment Importer
package app.insights.search.elasticsearch;
import app.insights.search.elasticsearch.query.IQuery;
import app.insights.search.elasticsearch.types.IIndexable;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestClientFactory;
import io.searchbox.client.JestResult;
import io.searchbox.client.config.ClientConfig;
import io.searchbox.core.Bulk;
import io.searchbox.core.Index;
import io.searchbox.core.Search;
import io.searchbox.indices.CreateIndex;
import io.searchbox.indices.DeleteIndex;
import org.apache.log4j.Logger;
import java.util.List;
import java.util.Map;
/**
* Created by kenny on 12/2/13.
*/
public class ElasticSearch {
private static final Logger LOGGER = Logger.getLogger(ElasticSearch.class);
private final String API_URL;
private JestClient client;
private boolean connected = false;
public ElasticSearch(String apiUrl) {
this.API_URL = apiUrl;
}
private void connect() {
try {
// Configuration
ClientConfig clientConfig = new ClientConfig.Builder(API_URL).multiThreaded(true).build();
// Construct a new Jest client according to configuration via factory
JestClientFactory factory = new JestClientFactory();
factory.setClientConfig(clientConfig);
client = factory.getObject();
connected = true;
} catch(Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
public void deleteIndex(String index) {
if(!connected) { connect(); }
try {
client.execute(new DeleteIndex.Builder(index).build());
} catch(Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
public void createIndex(String index) {
createIndex(index, null);
}
public void createIndex(String index, Map<String, String> settings) {
if(!connected) { connect(); }
try {
if(settings == null) {
client.execute(new CreateIndex.Builder(index).build());
} else {
client.execute(new CreateIndex.Builder(index).settings(settings).build());
}
} catch(Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
public void bulk(List<IIndexable> indexables, String index, String type) {
if(!connected) { connect(); }
try {
LOGGER.info("Building bulk index data");
Bulk.Builder bulkBuilder = new Bulk.Builder();
for(IIndexable indexable : indexables) {
bulkBuilder.addAction(new Index.Builder(indexable).index(index).type(type).build());
}
client.execute(bulkBuilder.build());
} catch(Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
public JestResult search(IQuery query) {
if(!connected) { connect(); }
try {
Search search = new Search.Builder(query.toJson())
.addIndex(query.getIndex())
.build();
return client.execute(search);
} catch(Exception e) {
LOGGER.error(e.getMessage(), e);
}
return null;
}
public static ElasticSearch buildDefault() {
return new ElasticSearch("https://<MY_HASH>.qbox.io/");
}
}
package app.insights.search.elasticsearch;
import app.insights.search.elasticsearch.types.IIndexable;
import app.models.entities.SearchIndexCommentEntity;
import app.models.entities.TopicEntity;
import app.models.repositories.SearchIndexCommentRepository;
import app.models.repositories.TopicRepository;
import lib.spring.DefaultSpringContextLoader;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import static com.google.common.collect.Collections2.transform;
/**
* Created by kenny on 11/25/13.
*/
@Service
public class ElasticSearchTopicImporter {
private static final Logger LOGGER = Logger.getLogger(ElasticSearchTopicImporter.class);
private static final int BULK_CHUNK_SIZE = 500;
@Autowired
private SearchIndexCommentRepository searchIndexCommentRepository;
@Autowired
private TopicRepository topicRepository;
private final ElasticSearch elasticSearch;
public static void main(String[] args) {
ApplicationContext applicationContext = DefaultSpringContextLoader.loadMasterProductionApplicationContext();
applicationContext.getBean(ElasticSearchTopicImporter.class).indexAll();
}
public ElasticSearchTopicImporter() {
elasticSearch = ElasticSearch.buildDefault();
/*
elasticSearch.deleteIndex("comments");
Map<String, String> settings = new HashMap<String, String>();
settings.put("number_of_shards", "2");
settings.put("number_of_replicas", "0");
elasticSearch.createIndex("comments", settings);
*/
}
public void indexAll() {
for(TopicEntity topicEntity : this.topicRepository.findAll()) {
if(topicEntity.getId() >= 1)
index(topicEntity);
}
}
public void index(TopicEntity topicEntity) {
LOGGER.info("Indexing topic: " + topicEntity.getId() + " " + topicEntity.getDescription());
try {
/* StringWriter sw = new StringWriter();
new JSONWriter(sw)
.object()
.key("row_id").value("T_11111")
.key("user").value("kenny")
.key("date").value("2013-11-30")
.key("comment").value("trying out Elastic Search")
.key("topic_id").value("1")
.key("rating").value("5")
.endObject();
//LOGGER.info(sw.toString()); */
LOGGER.info("Loading comments");
List<SearchIndexCommentEntity> searchIndexCommentEntities = this.searchIndexCommentRepository.findByTopicId(topicEntity.getId());
LOGGER.info("Loaded: " + searchIndexCommentEntities.size() + " comments");
LOGGER.info("Building bulk index data");
Collection<IIndexable> comments = transform(searchIndexCommentEntities, new CommentConverter());
LOGGER.info("Start writing index");
List<IIndexable> indexables = new LinkedList<IIndexable>();
for(IIndexable comment : comments) {
indexables.add(comment);
if(indexables.size() >= BULK_CHUNK_SIZE) {
LOGGER.info("Writing " + BULK_CHUNK_SIZE + " comments to index");
elasticSearch.bulk(indexables, "comments", "comment");
indexables.clear();
}
}
elasticSearch.bulk(indexables, "comments", "comment");
LOGGER.info("Finished writing index");
} catch(Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment