Created
December 4, 2013 05:27
-
-
Save kennycason/7782807 to your computer and use it in GitHub Desktop.
Elastic Search Comment Importer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package app.insights.search.elasticsearch; | |
| import app.insights.search.elasticsearch.query.IQuery; | |
| import app.insights.search.elasticsearch.types.IIndexable; | |
| import io.searchbox.client.JestClient; | |
| import io.searchbox.client.JestClientFactory; | |
| import io.searchbox.client.JestResult; | |
| import io.searchbox.client.config.ClientConfig; | |
| import io.searchbox.core.Bulk; | |
| import io.searchbox.core.Index; | |
| import io.searchbox.core.Search; | |
| import io.searchbox.indices.CreateIndex; | |
| import io.searchbox.indices.DeleteIndex; | |
| import org.apache.log4j.Logger; | |
| import java.util.List; | |
| import java.util.Map; | |
| /** | |
| * Created by kenny on 12/2/13. | |
| */ | |
| public class ElasticSearch { | |
| private static final Logger LOGGER = Logger.getLogger(ElasticSearch.class); | |
| private final String API_URL; | |
| private JestClient client; | |
| private boolean connected = false; | |
| public ElasticSearch(String apiUrl) { | |
| this.API_URL = apiUrl; | |
| } | |
| private void connect() { | |
| try { | |
| // Configuration | |
| ClientConfig clientConfig = new ClientConfig.Builder(API_URL).multiThreaded(true).build(); | |
| // Construct a new Jest client according to configuration via factory | |
| JestClientFactory factory = new JestClientFactory(); | |
| factory.setClientConfig(clientConfig); | |
| client = factory.getObject(); | |
| connected = true; | |
| } catch(Exception e) { | |
| LOGGER.error(e.getMessage(), e); | |
| } | |
| } | |
| public void deleteIndex(String index) { | |
| if(!connected) { connect(); } | |
| try { | |
| client.execute(new DeleteIndex.Builder(index).build()); | |
| } catch(Exception e) { | |
| LOGGER.error(e.getMessage(), e); | |
| } | |
| } | |
| public void createIndex(String index) { | |
| createIndex(index, null); | |
| } | |
| public void createIndex(String index, Map<String, String> settings) { | |
| if(!connected) { connect(); } | |
| try { | |
| if(settings == null) { | |
| client.execute(new CreateIndex.Builder(index).build()); | |
| } else { | |
| client.execute(new CreateIndex.Builder(index).settings(settings).build()); | |
| } | |
| } catch(Exception e) { | |
| LOGGER.error(e.getMessage(), e); | |
| } | |
| } | |
| public void bulk(List<IIndexable> indexables, String index, String type) { | |
| if(!connected) { connect(); } | |
| try { | |
| LOGGER.info("Building bulk index data"); | |
| Bulk.Builder bulkBuilder = new Bulk.Builder(); | |
| for(IIndexable indexable : indexables) { | |
| bulkBuilder.addAction(new Index.Builder(indexable).index(index).type(type).build()); | |
| } | |
| client.execute(bulkBuilder.build()); | |
| } catch(Exception e) { | |
| LOGGER.error(e.getMessage(), e); | |
| } | |
| } | |
| public JestResult search(IQuery query) { | |
| if(!connected) { connect(); } | |
| try { | |
| Search search = new Search.Builder(query.toJson()) | |
| .addIndex(query.getIndex()) | |
| .build(); | |
| return client.execute(search); | |
| } catch(Exception e) { | |
| LOGGER.error(e.getMessage(), e); | |
| } | |
| return null; | |
| } | |
| public static ElasticSearch buildDefault() { | |
| return new ElasticSearch("https://<MY_HASH>.qbox.io/"); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package app.insights.search.elasticsearch; | |
| import app.insights.search.elasticsearch.types.IIndexable; | |
| import app.models.entities.SearchIndexCommentEntity; | |
| import app.models.entities.TopicEntity; | |
| import app.models.repositories.SearchIndexCommentRepository; | |
| import app.models.repositories.TopicRepository; | |
| import lib.spring.DefaultSpringContextLoader; | |
| import org.apache.log4j.Logger; | |
| import org.springframework.beans.factory.annotation.Autowired; | |
| import org.springframework.context.ApplicationContext; | |
| import org.springframework.stereotype.Service; | |
| import java.util.Collection; | |
| import java.util.LinkedList; | |
| import java.util.List; | |
| import static com.google.common.collect.Collections2.transform; | |
| /** | |
| * Created by kenny on 11/25/13. | |
| */ | |
| @Service | |
| public class ElasticSearchTopicImporter { | |
| private static final Logger LOGGER = Logger.getLogger(ElasticSearchTopicImporter.class); | |
| private static final int BULK_CHUNK_SIZE = 500; | |
| @Autowired | |
| private SearchIndexCommentRepository searchIndexCommentRepository; | |
| @Autowired | |
| private TopicRepository topicRepository; | |
| private final ElasticSearch elasticSearch; | |
| public static void main(String[] args) { | |
| ApplicationContext applicationContext = DefaultSpringContextLoader.loadMasterProductionApplicationContext(); | |
| applicationContext.getBean(ElasticSearchTopicImporter.class).indexAll(); | |
| } | |
| public ElasticSearchTopicImporter() { | |
| elasticSearch = ElasticSearch.buildDefault(); | |
| /* | |
| elasticSearch.deleteIndex("comments"); | |
| Map<String, String> settings = new HashMap<String, String>(); | |
| settings.put("number_of_shards", "2"); | |
| settings.put("number_of_replicas", "0"); | |
| elasticSearch.createIndex("comments", settings); | |
| */ | |
| } | |
| public void indexAll() { | |
| for(TopicEntity topicEntity : this.topicRepository.findAll()) { | |
| if(topicEntity.getId() >= 1) | |
| index(topicEntity); | |
| } | |
| } | |
| public void index(TopicEntity topicEntity) { | |
| LOGGER.info("Indexing topic: " + topicEntity.getId() + " " + topicEntity.getDescription()); | |
| try { | |
| /* StringWriter sw = new StringWriter(); | |
| new JSONWriter(sw) | |
| .object() | |
| .key("row_id").value("T_11111") | |
| .key("user").value("kenny") | |
| .key("date").value("2013-11-30") | |
| .key("comment").value("trying out Elastic Search") | |
| .key("topic_id").value("1") | |
| .key("rating").value("5") | |
| .endObject(); | |
| //LOGGER.info(sw.toString()); */ | |
| LOGGER.info("Loading comments"); | |
| List<SearchIndexCommentEntity> searchIndexCommentEntities = this.searchIndexCommentRepository.findByTopicId(topicEntity.getId()); | |
| LOGGER.info("Loaded: " + searchIndexCommentEntities.size() + " comments"); | |
| LOGGER.info("Building bulk index data"); | |
| Collection<IIndexable> comments = transform(searchIndexCommentEntities, new CommentConverter()); | |
| LOGGER.info("Start writing index"); | |
| List<IIndexable> indexables = new LinkedList<IIndexable>(); | |
| for(IIndexable comment : comments) { | |
| indexables.add(comment); | |
| if(indexables.size() >= BULK_CHUNK_SIZE) { | |
| LOGGER.info("Writing " + BULK_CHUNK_SIZE + " comments to index"); | |
| elasticSearch.bulk(indexables, "comments", "comment"); | |
| indexables.clear(); | |
| } | |
| } | |
| elasticSearch.bulk(indexables, "comments", "comment"); | |
| LOGGER.info("Finished writing index"); | |
| } catch(Exception e) { | |
| LOGGER.error(e.getMessage(), e); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment