Last active
March 17, 2020 02:03
-
-
Save hivefans/a779531e9da5f059a70e to your computer and use it in GitHub Desktop.
Elasticsearch 批量index性能测试|-|{"files":{"build.gradle":{"env":"plain"},"EsPerformanceTest.java":{"env":"plain"}},"tag":"Uncategorized"}
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| group 'kingsoft' | |
| version '1.0-SNAPSHOT' | |
| apply plugin: 'java' | |
| sourceCompatibility = 1.8 | |
| repositories { | |
| mavenCentral() | |
| maven { | |
| url "https://maven-central.storage.googleapis.com" | |
| } | |
| } | |
| dependencies { | |
| compile 'org.elasticsearch:elasticsearch:2.3.1' | |
| compile 'org.slf4j:slf4j-api:1.7.21' | |
| compile 'org.slf4j:slf4j-log4j12:1.7.21' | |
| testCompile group: 'junit', name: 'junit', version: '4.11' | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package kingsoft.com; | |
| import org.apache.log4j.BasicConfigurator; | |
| import org.elasticsearch.action.admin.indices.flush.FlushRequest; | |
| import org.elasticsearch.action.bulk.BackoffPolicy; | |
| import org.elasticsearch.action.bulk.BulkProcessor; | |
| import org.elasticsearch.action.bulk.BulkRequest; | |
| import org.elasticsearch.action.bulk.BulkResponse; | |
| import org.elasticsearch.action.index.IndexRequest; | |
| import org.elasticsearch.client.transport.TransportClient; | |
| import org.elasticsearch.common.settings.Settings; | |
| import org.elasticsearch.common.transport.InetSocketTransportAddress; | |
| import org.elasticsearch.common.unit.ByteSizeUnit; | |
| import org.elasticsearch.common.unit.ByteSizeValue; | |
| import org.elasticsearch.common.unit.TimeValue; | |
| import org.elasticsearch.common.xcontent.XContentBuilder; | |
| import org.elasticsearch.common.xcontent.XContentFactory; | |
| import org.slf4j.Logger; | |
| import org.slf4j.LoggerFactory; | |
| import java.io.IOException; | |
| import java.net.InetAddress; | |
| import java.net.UnknownHostException; | |
| import java.util.concurrent.CountDownLatch; | |
| import java.util.concurrent.ExecutorService; | |
| import java.util.concurrent.Executors; | |
| import java.util.concurrent.atomic.AtomicLong; | |
| import java.util.concurrent.atomic.LongAdder; | |
| /** | |
| * Created by shidongjie on 16/4/19. | |
| */ | |
| public class EsPerformanceTest { | |
| private static final Logger logger = LoggerFactory.getLogger(EsPerformanceTest.class); | |
| private static ExecutorService executorService; | |
| private static long totalCount = 1000000; | |
| private static int bulkNum = 1000; | |
| private static int threads = 2 * Runtime.getRuntime().availableProcessors(); | |
| private static CountDownLatch startLatch; | |
| private static CountDownLatch endLatch; | |
| private static AtomicLong totalTime = new AtomicLong(0); | |
| private BulkProcessor bulkProcessor; | |
| private String lastIp; | |
| private TransportClient client; | |
| private LongAdder count = new LongAdder(); | |
| private void initESClient(){ | |
| logger.info("初始化 es client"); | |
| Settings settings = Settings.settingsBuilder() | |
| .put("action.bulk.compress", true) | |
| .put("transport.tcp.compress", true) | |
| .put("cluster.name", "DC-ES-HZ-Cluster").build(); | |
| try { | |
| client = TransportClient.builder().settings(settings).build() | |
| .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.4.69.160"), 9300)) | |
| .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("10.4.69.161"), 9300)); | |
| } | |
| catch(Exception e) { | |
| e.printStackTrace(); | |
| } | |
| //Client client = TransportClient.builder().settings(settings).build(); | |
| } | |
| public void start(){ | |
| initESClient(); | |
| try { | |
| threads = Integer.valueOf(System.getProperty("thread.nums", String.valueOf(threads))); | |
| bulkNum = Integer.valueOf(System.getProperty("bulk.nums", String.valueOf(bulkNum))); // 创建索引批量提交的个数 | |
| totalCount = Long.valueOf(System.getProperty("total.records", String.valueOf(totalCount))); | |
| InetAddress netAddress = InetAddress.getLocalHost(); | |
| String ip = netAddress.getHostAddress(); | |
| this.lastIp = ip.substring(ip.lastIndexOf(".") + 1); | |
| bulkProcessor = BulkProcessor.builder( | |
| client, | |
| new BulkProcessor.Listener() { | |
| @Override | |
| public void beforeBulk(long executionId, | |
| BulkRequest request) { } | |
| @Override | |
| public void afterBulk(long executionId, | |
| BulkRequest request, | |
| BulkResponse response) { } | |
| @Override | |
| public void afterBulk(long executionId, | |
| BulkRequest request, | |
| Throwable failure) { } | |
| }) | |
| .setBulkActions(10000) | |
| .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) | |
| .setFlushInterval(TimeValue.timeValueSeconds(5)) | |
| .setConcurrentRequests(1) | |
| .setBackoffPolicy( | |
| BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) | |
| .build(); | |
| logger.info("测试线程数:" + threads + ", 总记录数:" + totalCount + ", 索引批量创建个数: " + bulkNum); | |
| executorService = Executors.newFixedThreadPool(threads); | |
| startLatch = new CountDownLatch(1); | |
| endLatch = new CountDownLatch(threads); | |
| creatWorker(); | |
| startLatch.countDown(); | |
| logger.info("启动ES测试..."); | |
| endLatch.await(); | |
| logger.info("总记录数:{}", count.longValue()); | |
| logger.info("关闭bulkProcessor..."); | |
| bulkProcessor.close(); | |
| logger.info("ES测试结束..."); | |
| logger.info("avg time:" + totalTime.longValue()/1000/threads); | |
| logger.info("tps:" + (totalCount/(totalTime.longValue()/1000/threads))); | |
| executorService.shutdown(); | |
| System.exit(0); | |
| } catch(Exception e) { | |
| e.printStackTrace(); | |
| }finally{ | |
| if(client != null) { | |
| FlushRequest request = new FlushRequest("bigdata"); | |
| client.admin().indices().flush(request); | |
| client.close(); | |
| } | |
| } | |
| } | |
| public void creatWorker() { | |
| for(int i = 0; i < threads; i++) { | |
| final int index = i; | |
| executorService.submit(new Runnable() { | |
| public void run() { | |
| try { | |
| startLatch.await(); | |
| long startTime = System.currentTimeMillis(); | |
| creatIndex("T" + index); | |
| long time = System.currentTimeMillis() - startTime; | |
| totalTime.addAndGet(time); | |
| endLatch.countDown(); | |
| } catch (Exception e) { | |
| e.printStackTrace(); | |
| } | |
| } | |
| }); | |
| } | |
| } | |
| private void creatIndex(String prefixKey) { | |
| for(long j = 0; j < totalCount/threads; j++) { | |
| count.increment(); | |
| String key = prefixKey + "-" + this.lastIp + j; | |
| IndexRequest iRequest = new IndexRequest("bigdata", "aaa", key); | |
| try { | |
| XContentBuilder contentBuilder = XContentFactory.jsonBuilder().startObject(); | |
| contentBuilder.field("S_MDN", "S_MDN" + this.lastIp + j); | |
| contentBuilder.field("S_IMSI", "S_IMSI" + this.lastIp + j); | |
| contentBuilder.field("I_PAID_TYPE", "I_PAID_TYPE" + this.lastIp + j); | |
| contentBuilder.field("S_TRML_CODE", "S_TRML_CODE" + this.lastIp + j); | |
| contentBuilder.field("I_FCTY_ID", "I_FCTY_ID" + this.lastIp + j); | |
| contentBuilder.field("S_TM_OS", "S_TM_OS" + this.lastIp + j); | |
| contentBuilder.field("I_MODL_GENR", "I_MODL_GENR" + this.lastIp + j); | |
| contentBuilder.field("S_BSID", "S_BSID" + this.lastIp + j); | |
| contentBuilder.field("S_BSC_CODE", "S_BSC_CODE" + this.lastIp + j); | |
| contentBuilder.field("S_BTS_NAME", "S_BTS_NAME" + this.lastIp + j); | |
| contentBuilder.field("S_CELL_ID", "S_CELL_ID" + this.lastIp + j); | |
| contentBuilder.field("I_CITY_OID", "I_CITY_OID" + this.lastIp + j); | |
| contentBuilder.endObject(); | |
| iRequest.source(contentBuilder); | |
| bulkProcessor.add(iRequest); | |
| } catch (IOException e) { | |
| e.printStackTrace(); | |
| } | |
| } | |
| } | |
| public void stop() { | |
| logger.info("closed"); | |
| } | |
| public static void main(String[] args) { | |
| BasicConfigurator.configure(); | |
| EsPerformanceTest estest = new EsPerformanceTest(); | |
| estest.start(); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment