Created
May 17, 2020 09:09
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package xxx; | |
import com.alibaba.dubbo.common.URL; | |
import com.alibaba.dubbo.common.extension.ExtensionLoader; | |
import com.alibaba.dubbo.rpc.Invocation; | |
import com.alibaba.dubbo.rpc.Invoker; | |
import com.alibaba.dubbo.rpc.cluster.LoadBalance; | |
import com.alibaba.dubbo.rpc.cluster.loadbalance.AbstractLoadBalance; | |
import com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance; | |
import com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance; | |
import com.google.common.cache.Cache; | |
import com.google.common.cache.CacheBuilder; | |
import com.google.common.collect.Maps; | |
import lombok.extern.slf4j.Slf4j; | |
import java.util.List; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.Function; | |
import java.util.stream.Collectors; | |
/** | |
* A 调用 B | |
* 现象:当前使用一致性哈希方式,B 刚启动时,代码没有预热,所有流量打过去,造成启动期间 CPU 高 | |
* 前提:使用随机方式,可以在服务治理配置预热,刚启动的机器权重会小一些,打过去的请求少一些,因此可以让 B 预热(即时编译代码之类的) | |
* 方案:一个 requestId 的第一次请求使用随机,后续同一个 requestId 使用相同的 | |
* 限制:x 和 y 是不同的接口,但是 requestId 相同也要打到同一台机器 | |
* <p> | |
* 后续:当 B 两个接口合并为一个接口的不同方法后,可以缓存 requestId-invoker | |
* | |
* @author youthlin.chen | |
* @date 2019-10-24 15:32 | |
*/ | |
@Slf4j | |
@SuppressWarnings("UnstableApiUsage") | |
public class ReuseSessionLoadBalance extends AbstractLoadBalance { | |
/** | |
* 好像所有实现都是小写的名字呢 | |
*/ | |
@SuppressWarnings("unused") | |
public static final String NAME = "reusesession"; | |
/** | |
* 一个 requestId 对应的提供者机器地址 | |
* 不缓存 requestId-invoker 的原因: | |
* x 接口和 y 接口的 requestId 相同,但 invoker 不同 | |
* 需要打到同一台机器(address 相同就是同一台机器) | |
*/ | |
private static Cache<String, String> requestToAddress; | |
static { | |
resetCache(/*minutes 配置的值*/15L);//省略内部获取配置代码,配置变更实时 reset | |
} | |
/** | |
* 重设缓存时间 | |
*/ | |
private static void resetCache(long minutes) { | |
Cache<String, String> newCache = newCache(minutes); | |
if (requestToAddress != null) { | |
// 原有的不能丢了 | |
newCache.putAll(requestToAddress.asMap()); | |
} | |
requestToAddress = newCache; | |
log.info("reset_cache, minutes={} count={}", minutes, newCache.size()); | |
} | |
private static <K, V> Cache<K, V> newCache(long minutes) { | |
return CacheBuilder.newBuilder() | |
.expireAfterAccess(minutes, TimeUnit.MINUTES) | |
.removalListener(n -> /*记个监控.recordOne("reuseSession_cache_remove_cause_" + n.getCause())*/;) | |
.build(); | |
} | |
/** | |
* 第一次请求使用随机 刚启动预热中的机器权重更低 | |
*/ | |
private LoadBalance randomLoadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class) | |
.getExtension(RandomLoadBalance.NAME); | |
/** | |
* 开关关闭使用默认的一致性哈希 | |
*/ | |
private LoadBalance consistentHashLoadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class) | |
.getExtension(ConsistentHashLoadBalance.NAME); | |
/** | |
* 一个服务的方法对应的提供方列表: key 为 serviceKey.methodName value 为该方法对应的所有提供方 | |
* 因为缓存的是机器 address(即 host:port) 而我们需要返回 invoker | |
* 为了不用每次遍历 invokers 找到相应 address 的 invoker | |
* 所以使用 Selector 包一下,提前做好 address - invoker 映射 | |
* 在 invokers 变化时(identityHashCode 改变)重新缓存映射关系 | |
*/ | |
private ConcurrentMap<String, Selector<?>> selectors = Maps.newConcurrentMap(); | |
@Override | |
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { | |
long start = System.currentTimeMillis(); | |
try { | |
// 取第一个参数作为会话的标识, 即: requestId | |
final String requestId = invocation.getArguments()[0].toString(); | |
// 开关关闭用原来的一致性哈希 | |
if (!/*开关默认关闭*/false | |
&& requestToAddress.getIfPresent(requestId) == null) { | |
if (requestToAddress.size() > 0) { | |
// cleanUp 清除过期的 key, 全部清除是 invalidateAll | |
requestToAddress.cleanUp(); | |
} | |
if (requestToAddress.size() == 0) { | |
selectors.clear(); | |
} | |
//记个监控 | |
return consistentHashLoadBalance.select(invokers, url, invocation); | |
} | |
// 不同接口不同 key: group/interface:version | |
String serviceKey = invokers.get(0).getUrl().getServiceKey(); | |
@SuppressWarnings("unchecked") | |
Selector<T> selector = (Selector<T>) selectors.get(serviceKey); | |
int identityHashCode = System.identityHashCode(invokers); | |
// hash 值不一样说明提供者有变化 需要重新缓存提供者列表 | |
if (selector == null || selector.identityHashCode != identityHashCode) { | |
//记个监控 recordOne("reuseSession_do_select_put_selector"); | |
selector = new Selector<>(invokers, identityHashCode); | |
selectors.put(serviceKey, selector); | |
} | |
final String address = requestToAddress.get(requestId, () -> { | |
// 记个监控recordOne("reuseSession_do_select_first_requestId"); | |
// 第一次随机选一台 (guava cache 机制 多个线程同时 load 只有一个线程会进来) | |
return randomLoadBalance.select(invokers, url, invocation).getUrl().getAddress(); | |
}); | |
//记个监控recordOne("reuseSession_new"); | |
return selector.computeInvokerIfAbsent(address, (adr) -> { | |
// address 是缓存的,但 invoker 已下线 此时需要 reselect | |
// 这时 selector map 里 address 是下线的,但对应的 invoker 是在线的 | |
// e.g.: 127.0.0.1 是缓存里的 address, 对应机器已下线; | |
// 然后这里重新选择使该 address 对应了正确的 invoker 192.168.0.1 | |
Invoker<T> invoker = randomLoadBalance.select(invokers, url, invocation); | |
requestToAddress.put(requestId, invoker.getUrl().getAddress()); | |
return invoker; | |
} | |
); | |
} catch (Exception e) { | |
//记个监控recordOne("reuseSession_do_select_error"); | |
log.warn("reuseSession_do_select_error", e); | |
// 兜底 | |
return consistentHashLoadBalance.select(invokers, url, invocation); | |
} finally { | |
//记个监控recordOne("reuseSession_do_select", System.currentTimeMillis() - start); | |
} | |
} | |
private static class Selector<T> { | |
private final ConcurrentHashMap<String, Invoker<T>> map; | |
private final long identityHashCode; | |
private Selector(List<Invoker<T>> invokers, long identityHashCode) { | |
this.map = invokers.stream() | |
.collect(Collectors.toMap( | |
invoker -> invoker.getUrl().getAddress(), | |
Function.identity(), | |
(older, newer) -> { | |
// 每个提供者的地址都是不同的,所以这里不应该发生 | |
//记个监控.recordOne("reuseSession_selector_duplicated_address_invoker"); | |
return newer; | |
}, | |
ConcurrentHashMap::new | |
)); | |
this.identityHashCode = identityHashCode; | |
} | |
private Invoker<T> computeInvokerIfAbsent(String address, Function<String, Invoker<T>> reselect) { | |
// ConcurrentHashMap 保证同一个 key 的 mappingFunction 只有一个会执行 | |
// doc: the function is applied at most once per key | |
return map.computeIfAbsent(address, reselect); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment