Created
June 7, 2016 15:05
-
-
Save bbeaudreault/bbec28c502d12148f8657aa3f9fecd4f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.hadoop.hbase.regionserver.metrics; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.Map; | |
import java.util.Set; | |
import java.util.Timer; | |
import java.util.TimerTask; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.apache.commons.lang.StringUtils; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.hadoop.hbase.metrics.BaseSourceImpl; | |
import org.apache.hadoop.metrics2.MetricsCollector; | |
import com.yammer.metrics.stats.EWMA; | |
public class RpcHandlerUsageSourceImpl implements RpcHandlerUsageSource { | |
private static Log LOG = LogFactory.getLog(RpcHandlerUsageSourceImpl.class); | |
private static RpcHandlerUsageSourceImpl instance = null; | |
public static synchronized RpcHandlerUsageSourceImpl getInstance() { | |
if (instance == null) { | |
instance = new RpcHandlerUsageSourceImpl().init(); | |
} | |
return instance; | |
} | |
private final Map<String, Map<String, EWMA>> userRpcUsageByPool = new HashMap<>(); | |
private final Map<String, EWMA> idleRpcTimeByPool = new HashMap<>(); | |
private final RpcHandlerThreadRegistry handlerRegistry = new RpcHandlerThreadRegistry(); | |
private final Timer taskRunner = new Timer("RPC Handler Usage Metrics Timer", true); | |
private RpcHandlerQueueUsageMetrics queueUsageMetrics = null; | |
private RpcHandlerUserUsageMetrics userUsageMetrics = null; | |
private RpcHandlerUsageSourceImpl() {} | |
@Override | |
public synchronized void registerRpcHandler(String handlerPool) { | |
if (handlerPool == null) { | |
return; | |
} | |
if (!userRpcUsageByPool.containsKey(handlerPool)) { | |
userRpcUsageByPool.put(handlerPool, new ConcurrentHashMap<String, EWMA>()); | |
} | |
if (!idleRpcTimeByPool.containsKey(handlerPool)) { | |
idleRpcTimeByPool.put(handlerPool, EWMA.oneMinuteEWMA()); | |
} | |
handlerRegistry.registerPool(handlerPool); | |
handlerRegistry.set(handlerPool); | |
} | |
@Override | |
public void countIdlePeriod(long nanosIdle) { | |
String handlerPool = handlerRegistry.get(); | |
if (handlerPool == null) { | |
LOG.warn("Unregistered RPC handler " + Thread.currentThread().getName() + " attempted to log an idle period."); | |
return; // don't accept usage data from unregistered handlers | |
} | |
idleRpcTimeByPool.get(handlerPool).update(nanosIdle); | |
} | |
@Override | |
public void countUsagePeriodForUser(final String username, final long nanosInUse) { | |
String handlerPool = handlerRegistry.get(); | |
if (handlerPool == null) { | |
LOG.warn("Unregistered RPC handler " + Thread.currentThread().getName() + " attempted to log a usage period."); | |
return; // don't accept usage data from unregistered handlers | |
} | |
Map<String, EWMA> queueAveragesByUser = userRpcUsageByPool.get(handlerPool); | |
EWMA userMovingAverage = queueAveragesByUser.get(username); | |
if (userMovingAverage == null) { | |
userMovingAverage = EWMA.oneMinuteEWMA(); | |
queueAveragesByUser.put(username, userMovingAverage); | |
} | |
userMovingAverage.update(nanosInUse); | |
} | |
@Override | |
public void unregisterRpcHandler() { | |
handlerRegistry.remove(); | |
} | |
private RpcHandlerUsageSourceImpl init() { | |
taskRunner.schedule(new EWMATickTask(), TimeUnit.SECONDS.toMillis(5), TimeUnit.SECONDS.toMillis(5)); | |
if (queueUsageMetrics == null ) { | |
this.queueUsageMetrics = new RpcHandlerQueueUsageMetrics(); | |
} | |
if (userUsageMetrics == null) { | |
this.userUsageMetrics = new RpcHandlerUserUsageMetrics(); | |
} | |
return this; | |
} | |
private double getIdlePctOfHandlerPool(String handlerPool) { | |
EWMA idleTimeAverage = idleRpcTimeByPool.get(handlerPool); | |
if (idleTimeAverage == null) { | |
return 100d; | |
} | |
return 100 * (idleRpcTimeByPool.get(handlerPool).rate(TimeUnit.NANOSECONDS) / handlerRegistry.getHandlerCountOfPool(handlerPool)); | |
} | |
private static class RpcHandlerThreadRegistry extends ThreadLocal<String> { | |
/** | |
* Map of QueueTypes -> count of RPC handler threads for that QueueType. | |
* The COMBINED QueueType represents the sum of all RPC handler threads. | |
*/ | |
private final Map<String, AtomicInteger> handlerPoolCounts = new HashMap<>(); | |
public int getHandlerCountOfPool(String handlerPool) { | |
return Math.max(handlerPoolCounts.get(handlerPool).get(), 1); // minimum 1 to prevent potential division by 0 | |
} | |
@Override | |
public void set(String value) { | |
if (value == null) { | |
remove(); | |
} else if (get() == null) { // no-op if this thread has already registered | |
handlerPoolCounts.get(value).incrementAndGet(); | |
super.set(value); | |
} | |
} | |
@Override | |
public void remove() { | |
String current = get(); | |
if (current != null) { | |
handlerPoolCounts.get(current).decrementAndGet(); | |
super.remove(); | |
} | |
} | |
private void registerPool(String handlerPool) { | |
if (!handlerPoolCounts.containsKey(handlerPool)) { | |
handlerPoolCounts.put(handlerPool, new AtomicInteger()); | |
} | |
} | |
} | |
private class EWMATickTask extends TimerTask { | |
@Override | |
public void run() { | |
for (Map<String, EWMA> userEntries : userRpcUsageByPool.values()) { | |
for (EWMA movingAverage : userEntries.values()) { | |
movingAverage.tick(); | |
} | |
} | |
for (EWMA movingAverage : idleRpcTimeByPool.values()) { | |
movingAverage.tick(); | |
} | |
} | |
} | |
private class RpcHandlerQueueUsageMetrics extends BaseSourceImpl { | |
private static final String METRIC_NAME = "RpcHandlerPoolUsage"; | |
RpcHandlerQueueUsageMetrics() { | |
super(METRIC_NAME, DESCRIPTION, METRIC_CONTEXT, "RegionServer,sub="+METRIC_NAME); | |
} | |
@Override | |
public void getMetrics(MetricsCollector metricsCollector, boolean all) { | |
for (String handlerPool : idleRpcTimeByPool.keySet()) { | |
setGauge( | |
handlerPool.toLowerCase().replace('.', '-') + "_handlers_usage_pct", | |
Math.round(100 - getIdlePctOfHandlerPool(handlerPool))); | |
} | |
super.getMetrics(metricsCollector, all); | |
} | |
} | |
private class RpcHandlerUserUsageMetrics extends BaseSourceImpl { | |
private static final String METRIC_NAME = "RpcHandlerUserBreakdown"; | |
private static final double ZERO_THRESHOLD = 0.00001d; // arbitrary value-too-low-to-care-about threshold for usage/metrics | |
private final Set<String> currentZeroMetrics = new HashSet<>(); | |
RpcHandlerUserUsageMetrics() { | |
super(METRIC_NAME, DESCRIPTION, METRIC_CONTEXT, "RegionServer,sub="+METRIC_NAME); | |
} | |
/** | |
* @return true if metrics exists and was set successfully; false if metric was deleted due to consecutive zero values. | |
*/ | |
public synchronized boolean setSelfCleaningGauge(String gaugeName, double value) { | |
if (value <= ZERO_THRESHOLD) { | |
if (currentZeroMetrics.contains(gaugeName)) { | |
// since this metric was already at zero, delete it instead of reporting another 0 | |
removeMetric(gaugeName); | |
return false; | |
} else { | |
currentZeroMetrics.add(gaugeName); | |
} | |
} else { | |
currentZeroMetrics.remove(gaugeName); | |
} | |
setGauge(gaugeName, Math.round(value)); | |
return true; | |
} | |
@Override | |
public void getMetrics(MetricsCollector metricsCollector, boolean all) { | |
for (Map.Entry<String, Map<String, EWMA>> rpcPoolEntry : userRpcUsageByPool.entrySet()) { | |
String handlerPool = rpcPoolEntry.getKey(); | |
// snapshot { user : usage rate } mappings | |
Map<String, Double> userRpcUsageRates = new HashMap<>(rpcPoolEntry.getValue().size()); | |
for (Map.Entry<String, EWMA> userAndUsage : rpcPoolEntry.getValue().entrySet()) { | |
userRpcUsageRates.put(userAndUsage.getKey(), userAndUsage.getValue().rate(TimeUnit.NANOSECONDS)); | |
} | |
double totalUsage = 0d; | |
for (Double usageRate : userRpcUsageRates.values()) { | |
totalUsage += usageRate; | |
} | |
for (Map.Entry<String, Double> userAndUsageRate : userRpcUsageRates.entrySet()) { | |
String username = userAndUsageRate.getKey(); | |
double usagePct = totalUsage > ZERO_THRESHOLD ? 100 * (userAndUsageRate.getValue() / totalUsage) : 0; | |
boolean metricExists = setSelfCleaningGauge( | |
username + "_usage_pct_of_" + handlerPool.toLowerCase().replace('.', '-') + "_handlers", | |
usagePct); | |
if (!metricExists) { | |
// clean up users with consecutive usage rates of zero | |
rpcPoolEntry.getValue().remove(username); | |
} | |
} | |
} | |
super.getMetrics(metricsCollector, all); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment