Skip to content

Instantly share code, notes, and snippets.

@chaudum
Created February 22, 2017 09:32
Show Gist options
  • Select an option

  • Save chaudum/442b26de394bd49dd703cba3b048132c to your computer and use it in GitHub Desktop.

Select an option

Save chaudum/442b26de394bd49dd703cba3b048132c to your computer and use it in GitHub Desktop.
/*
* Licensed to Crate under one or more contributor license agreements.
* See the NOTICE file distributed with this work for additional
* information regarding copyright ownership. Crate licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial
* agreement.
*/
package io.crate.integrationtests;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static org.hamcrest.Matchers.is;
@ESIntegTestCase.ClusterScope(numDataNodes = 3)
public class BulkUpsertIntegrationTest extends SQLTransportIntegrationTest {
@Repeat(iterations = 10)
@Test
public void testBulkUpsertIntoPartitionedTable() throws Exception {
execute("create table t_parted (p int primary key, val int) partitioned by (p) clustered into 3 shards with (number_of_replicas=0)");
ensureYellow();
int bulkSize = 100;
Object[][] bulkArgs = new Object[bulkSize][];
for (int x = 0; x < bulkSize; x++) {
bulkArgs[x] = new Object[]{x, 0};
}
execute("insert into t_parted (p, val) values (?, ?) on duplicate key update val = val + 1", bulkArgs);
refresh();
execute("select count(*) from t_parted");
assertThat((Long) response.rows()[0][0], is(100L));
bulkSize = 200;
int numThreads = 3;
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(numThreads);
try {
final CountDownLatch latch = new CountDownLatch(numThreads);
for (int iter = 0; iter < 3; iter++) {
final int finalBulkSize = bulkSize;
final int finalIter = iter;
scheduledExecutorService.execute(new Runnable() {
@Override
public void run() {
final Object[][] b = new Object[finalBulkSize][];
for (int x = 0; x < finalBulkSize; x++) {
b[x] = new Object[]{x, finalIter};
}
execute("insert into t_parted (p, val) values (?, ?) on duplicate key update val = val + 1", b);
latch.countDown();
}
});
}
latch.await();
} finally {
scheduledExecutorService.shutdownNow();
}
refresh();
execute("select count(*) from t_parted");
assertThat((Long) response.rows()[0][0], is(200L));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment