Skip to content

Instantly share code, notes, and snippets.

@pfmiles
Last active August 6, 2020 03:55
Show Gist options
  • Save pfmiles/bf9091a718498fe6ea552d29df0bb593 to your computer and use it in GitHub Desktop.
Save pfmiles/bf9091a718498fe6ea552d29df0bb593 to your computer and use it in GitHub Desktop.
用于高性能热点执行链路的二级cache实现
/**
* NULL类型的值,通常用在cache系统中避免'0值攻击'; 全局唯一值;
* 定义为enum省去了hashCode & equals实现的烦恼,且避免了反序列化时潜在的创建多个实例的可能性以及无参public构造函数的要求
*
* @author pf-miles Apr 19, 2016 5:50:03 PM
*/
public enum Null {
NULL;
}
package test;
import static test.Null.NULL;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import test.KvStore;
import test.Constants;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
/**
* 两级缓存,其中L1 cache只能定时LRU清除, L2 cache是外部k-v store缓存, 可主动调用清除;
* 本缓存用于高性能要求的执行链路中;
* 带出错自动保护功能
*
* @author pf-miles Apr 20, 2016 7:52:22 PM
*/
@SuppressWarnings("unchecked")
public class TwoLevelCache<V> implements InitializingBean {
private static final Logger log = LoggerFactory
.getLogger(TwoLevelCache.class);
// 当真实的业务数据加载逻辑发生错误时,cache对其的自动保护时间(ms)
private static final long ERR_PROTECTION_TIME = 30 * 1000;
// 数据的命名空间,做到按业务隔离
private String namespace;
// L1 cache的最大条目数
private int maxSize = 1024;
// 预计的平均并发访问线程数
private int concLevel = Constants.CPU_NUM * 10;
// L1 cache的清除周期,以秒记
private int duration = 30;
private KvStore l2Cache;
private Cache<String, Object> l1Cache;
// L2 cache的清除周期,以秒记,小于等于0的值表示永不自动清除
private int l2Duration = -1;
/**
* 加载数据
*
* @param k key
* @param realLoader 缓存未命中时,真正的数据加载器
*/
public V get(String k, final Callable<V> realLoader) {
if (k == null) {
log.error("Null key when getting data from cache, illegal!");
return null;
}
final String cacheKey = resolveCacheKey(k);
Object v = null;
try {
// 先从L1 cache取
v = this.l1Cache.get(cacheKey, new Callable<Object>() {
@Override
public Object call() throws Exception {
// 取不到再从L2 cache取
Object o = null;
try {
o = l2Cache.get(cacheKey);
} catch (Exception e) {
log.error("Error when fetching data from L2 cache, ignored.", e);
}
if (o == null) {
// 从未被加载过,真正加载数据
boolean err = false;
try {
o = realLoader.call();
} catch (Throwable t) {
log.error("Error when invoking real data loader, protecting...", t);
err = true;
}
if (o == null)
o = NULL;// 避免0值攻击
try {
_put(cacheKey, o, err);
} catch (Exception e) {
log.error("Error when putting data to L2 cache, ignored.", e);
}
}
return o;
}
});
} catch (ExecutionException e) {
log.error("Error when loading data in L1 cache.", e);
return null;
}
if (NULL.equals(v)) {
return null;
} else {
return (V) v;
}
}
/**
* put数据到cache, 该操作只会刷新L2 cache,L1 cache只能定时自动刷新
*/
public boolean put(String key, V value) {
if (key == null) {
log.error("Null key when putting data to cache, illegal!");
return false;
}
return _put(resolveCacheKey(key), value, false);
}
// @param err value是否是发生错误时产生的值, 若为true,则自动保护ERR_PROTECTION_TIME长度的时间
private boolean _put(String cacheKey, Object value, boolean err) {
if (err) {
return this.l2Cache.put(cacheKey, value, ERR_PROTECTION_TIME);
} else if (this.l2Duration > 0) {
return this.l2Cache.put(cacheKey, value, this.l2Duration * 1000);
} else {
return this.l2Cache.put(cacheKey, value);
}
}
/**
* 从cache中删除数据,该操作只会刷新L2 cache, L1 cache只能定时自动刷新
*/
public boolean remove(String key) {
if (key == null) {
log.error("Null key when removing data from cache, illegal!");
return false;
}
return this.l2Cache.delete(resolveCacheKey(key));
}
private String resolveCacheKey(String k) {
return this.namespace + "_" + k;
}
@Override
public void afterPropertiesSet() throws Exception {
if (this.namespace == null)
throw new RuntimeException("'namespace' must be configured.");
if (this.l2Cache == null)
throw new RuntimeException("'l2Cache' must be injected.");
this.l1Cache = CacheBuilder.newBuilder().concurrencyLevel(concLevel).maximumSize(maxSize)
.expireAfterWrite(duration, TimeUnit.SECONDS).build();
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public void setMaxSize(int maxSize) {
this.maxSize = maxSize;
}
public void setConcLevel(int concLevel) {
this.concLevel = concLevel;
}
public void setDuration(int duration) {
this.duration = duration;
}
public void setL2Cache(KvStore l2Cache) {
this.l2Cache = l2Cache;
}
public void setL2Duration(int l2Duration) {
this.l2Duration = l2Duration;
}
}
package test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.test.util.ReflectionTestUtils;
import test.KvStore;
import com.google.common.cache.Cache;
/**
* @author pf-miles Apr 19, 2016 7:20:16 PM
*/
@SuppressWarnings("unchecked")
public class TwoLevelCacheTest {
private TwoLevelCache<String> cache = new TwoLevelCache<String>();
// 用作真正的数据loader调用次数计数
private Map<String, Integer> counter = new HashMap<String, Integer>();
// 模仿L2 cache实现
private Map<Object, Object> store = new HashMap<Object, Object>();
@Before
public void init() throws Exception {
// init
cache.setNamespace("testNs");
cache.setConcLevel(4);
cache.setL2Cache(new KvStore() {
@Override
public <K, V> V get(K key) {
return (V) store.get(key);
}
@Override
public <K, V> boolean put(K key, V value) {
store.put(key, value);
return true;
}
@Override
public <K, V> boolean put(K key, V value, long expire) {
store.put(key, value);
return true;
}
@Override
public <K> boolean delete(K key) {
store.remove(key);
return true;
}
});
cache.setDuration(5);
cache.afterPropertiesSet();
}
@Test
public void test() throws InterruptedException {
// 初始load
String val = get("hasVal");
Assert.assertTrue("有值".equals(val));
val = get("noVal");
Assert.assertTrue(val == null);
// 命中cache
val = get("hasVal");
Assert.assertTrue("有值".equals(val));
val = get("noVal");
Assert.assertTrue(val == null);
// 检查L2 cache count
Assert.assertTrue(store.size() == 2);
// 检查L1 cache count
Cache<String, Object> l1 = (Cache<String, Object>) ReflectionTestUtils.getField(this.cache,
"l1Cache");
Assert.assertTrue(l1.size() == 2);
cache.remove("hasVal");
// 检查L2 cache count
Assert.assertTrue(store.size() == 1);
// 检查L1 cache count, L1 cache此时不会被清理
Assert.assertTrue(l1.size() == 2);
// 重新put值
cache.put("hasVal", "还是有值");
// 检查L2 cache count
Assert.assertTrue(store.size() == 2);
// 检查L1 cache count
Assert.assertTrue(l1.size() == 2);
// 就算重新put,cache得到的值也不会立刻改变,因为L1 cache不能被主动刷新
Assert.assertTrue("有值".equals(get("hasVal")));
// 不过L2 cache里的值已经变了7
Assert.assertTrue("还是有值".equals(store.get("testNs" + "_" + "hasVal")));
// 测试清除
Thread.sleep(6000);
List<String> keys = new ArrayList<String>();
keys.add("hasVal");
keys.add("noVal");
Assert.assertTrue(l1.getAllPresent(keys).size() == 0);
// 应当报错:
// cache.remove("noVal");
// get("noVal");
}
@Test
public void testExceptions() {
Assert.assertNull(get(null));
Assert.assertFalse(cache.put(null, "test"));
Assert.assertFalse(cache.remove(null));
}
private String get(final String k) {
Callable<String> realLoader = new Callable<String>() {
@Override
public String call() throws Exception {
Integer count = counter.get(k);
if (count != null)
Assert.fail(
"Real loader must not be invoked more than once for any specific key.");
counter.put(k, 1);
if ("hasVal".equals(k)) {
return "有值";
} else {
return null;
}
}
};
return this.cache.get(k, realLoader);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment