Created
June 22, 2016 07:01
-
-
Save krosenvold/fa20521ad121a0cbb4c6ed6be91452e5 to your computer and use it in GitHub Desktop.
Concurrency problem with loadCache
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.io.Serializable; | |
import java.util.Collection; | |
import java.util.Map; | |
import java.util.concurrent.CountDownLatch; | |
import javax.cache.Cache; | |
import javax.cache.configuration.Factory; | |
import javax.cache.integration.CacheLoaderException; | |
import javax.cache.integration.CacheWriterException; | |
import org.apache.ignite.Ignite; | |
import org.apache.ignite.IgniteCache; | |
import org.apache.ignite.IgniteCheckedException; | |
import org.apache.ignite.cache.CacheMode; | |
import org.apache.ignite.cache.CachePeekMode; | |
import org.apache.ignite.cache.CacheRebalanceMode; | |
import org.apache.ignite.cache.store.CacheStore; | |
import org.apache.ignite.configuration.CacheConfiguration; | |
import org.apache.ignite.configuration.IgniteConfiguration; | |
import org.apache.ignite.internal.IgnitionEx; | |
import org.apache.ignite.lang.IgniteBiInClosure; | |
import org.apache.ignite.plugin.segmentation.SegmentationPolicy; | |
public class IgniteTester { | |
private static int CNT = 1_500_000; | |
public static void main(String[] args) throws InterruptedException, IOException, IgniteCheckedException { | |
Ignite ignite = IgnitionEx.start(baseConfigNoDiscovery("lonely-testGrid")); | |
final IgniteCache<Integer, String> testCache = ignite.getOrCreateCache(cacheConfig("testCache")); | |
testCache.rebalance().get(); // because ignite 1.5 does not start cache properly replicated | |
populateOrCheck(testCache); | |
populateOrCheck(ignite.getOrCreateCache(cacheConfig("testCache2"))); | |
populateOrCheck(ignite.getOrCreateCache(cacheConfig("testCache3"))); | |
populateOrCheck(ignite.getOrCreateCache(cacheConfig("testCache4"))); | |
populateOrCheck(ignite.getOrCreateCache(cacheConfig("testCache5"))); | |
populateOrCheck(ignite.getOrCreateCache(cacheConfig("testCache6"))); | |
} | |
private static void populateOrCheck(IgniteCache<Integer, String> testCache) throws InterruptedException { | |
if (testCache.localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP) == 0) { | |
System.out.print("Cache is empty, loading..."); | |
testCache.loadCache( (k,v) -> true); | |
System.out.println("Data loaded."); | |
} else { | |
int currentSize; | |
while ((currentSize = testCache.localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP)) < CNT) { | |
System.out.println("currentSize = " + currentSize + " waiting for cache to get proper size"); | |
Thread.sleep(1000); | |
} | |
for (int i = 0; i < CNT; i++){ | |
final String s = testCache.get(i); | |
if (!s.equals( genString(i))){ | |
throw new IllegalStateException("No good state"); | |
} | |
} | |
System.out.println("We seem to get correct data"); | |
} | |
} | |
private static String genString(int i) { | |
StringBuilder sb = new StringBuilder(); | |
/*for( int j = 0; j < (i/10000); j++){ | |
sb.append(Integer.toString(j)); | |
} */ | |
sb.append("_"); | |
sb.append(Integer.toString(i)); | |
return sb.toString(); | |
} | |
public static CacheConfiguration<Integer, String> cacheConfig(String testCache) { | |
CacheConfiguration<Integer, String> config = new CacheConfiguration<>(); | |
config.setName(testCache); | |
config.setCacheMode(CacheMode.REPLICATED); | |
config.setRebalanceMode(CacheRebalanceMode.SYNC); | |
config.setCacheStoreFactory(new TestFactory()); | |
return config; | |
} | |
public static IgniteConfiguration baseConfigNoDiscovery(String gridName) throws IOException { | |
IgniteConfiguration igniteConfiguration = new IgniteConfiguration(); | |
igniteConfiguration.setFailureDetectionTimeout(30_000); | |
igniteConfiguration.setWorkDirectory(java.nio.file.Files.createTempDirectory("tc").toFile().getAbsolutePath()); | |
igniteConfiguration.setGridName(gridName); | |
igniteConfiguration.setSegmentationPolicy(SegmentationPolicy.NOOP); | |
igniteConfiguration.setMetricsLogFrequency(0); | |
return igniteConfiguration; | |
} | |
public static class TestFactory implements Factory<CacheStore<? super Object, ? super Object>>, Serializable { | |
@Override public CacheStore<? super Object, ? super Object> create() { | |
return new TestCacheStore(); | |
} | |
} | |
static CountDownLatch globalLatch = new CountDownLatch(1); | |
public static class TestCacheStore<Integer, String> implements CacheStore<Integer, String>, Serializable { | |
@Override public void loadCache(IgniteBiInClosure clo, Object... args) throws CacheLoaderException { | |
for (int i = 0; i < CNT; i++){ | |
if (i == 100_000) globalLatch.countDown(); | |
clo.apply(i, genString(i)); | |
} | |
} | |
@Override public void sessionEnd(boolean commit) throws CacheWriterException { | |
} | |
@Override public Object load(Object o) throws CacheLoaderException { | |
return null; | |
} | |
@Override public Map loadAll(Iterable iterable) throws CacheLoaderException { | |
return null; | |
} | |
@Override public void write(Cache.Entry entry) throws CacheWriterException { | |
} | |
@Override public void writeAll(Collection collection) throws CacheWriterException { | |
} | |
@Override public void delete(Object o) throws CacheWriterException { | |
} | |
@Override public void deleteAll(Collection collection) throws CacheWriterException { | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment