Created
October 4, 2012 15:11
-
-
Save remeniuk/3834258 to your computer and use it in GitHub Desktop.
modified HTablePool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class HTablePool { | |
private static final int RESIZE_TABLE_POOL_INTERVAL_SEC = (int) TimeUnit.MINUTES.toSeconds(30); | |
private static final int REPORT_CACHED_CONNECTIONS_COUNT_INTERVAL_SEC = (int) TimeUnit.MINUTES.toSeconds(2); | |
private static final Log log = LogFactory.getLog(HTablePool.class); | |
protected final ConcurrentMap<String, LinkedList<HTableInterface>> tables = new ConcurrentHashMap<String, LinkedList<HTableInterface>>(); | |
private final Configuration config; | |
private final HTableInterfaceFactory tableFactory; | |
/** | |
* Current number of active table connections, opened through HTablePool (either taken from pool, or initiated | |
* from scratch) | |
* <p/> | |
* Required to generate time series on connections to different tables, and to estimate average load on the table, | |
* required for calculation of the pool size | |
* <p/> | |
* Not thread safe - call in a synchronized context | |
*/ | |
private final ConcurrentMap<String, Histogram> activeTableConnectionsHistograms = new ConcurrentHashMap<String, Histogram>(); | |
/** | |
* Histogram of active table connections, opened through HTablePool (either taken from pool, or initiated | |
* from scratch) | |
*/ | |
private final ConcurrentMap<String, AtomicLong> activeTableConnectionsCount = new ConcurrentHashMap<String, AtomicLong>(); | |
/** | |
* Total number of connections, taken from pool | |
*/ | |
private final AtomicLong poolConnections = new AtomicLong(0); | |
@Resource(name = "scheduler") | |
private Scheduler nettyScheduler; | |
/** | |
* Histogram of connections, taken from pool | |
*/ | |
@Resource(name = "poolConnectionsHistogram") | |
private Histogram poolConnectionsHistogram; | |
/** | |
* Histogram of idle connections, stored in cache | |
*/ | |
@Resource(name = "cachedConnectionsHistogram") | |
private Histogram cachedConnectionsHistogram; | |
@Resource(name = "hBaseMetricsRegistry") | |
private MetricsRegistry metricsRegistry; | |
/** | |
* Maximum number of connections that can be opened through pool (both initiated from scratch and pooled) | |
* Must be significantly smaller than ZooKeeper connections count (not all connections are opened through | |
* HTablePool) | |
*/ | |
private final int maxConnectionsCount; | |
/** | |
* Maximum size of the queue of connections to a specific table | |
*/ | |
protected final int maxTableConnectionsCount; | |
/** | |
* Core/minimal size of the table pool | |
*/ | |
protected final int coreTablePoolSize; | |
/** | |
* Default Constructor. | |
*/ | |
protected HTablePool() { | |
this(null, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE); | |
} | |
/** | |
* Constructor to set maximum versions and use the specified configuration. | |
* | |
* @param config configuration | |
* @param corePoolSize minimum number of references to keep for each table | |
*/ | |
public HTablePool(final Configuration config, final int corePoolSize, final int maxConnectionsCount, | |
final int maxTableConnectionsCount) { | |
// Make a new configuration instance so I can safely coreTablePoolSize when | |
// done with the pool. | |
this.config = config == null ? null : new Configuration(config); | |
this.coreTablePoolSize = corePoolSize; | |
this.maxConnectionsCount = maxConnectionsCount; | |
this.maxTableConnectionsCount = maxTableConnectionsCount; | |
this.tableFactory = new HTableFactory(); | |
} | |
@PostConstruct | |
private void init() { | |
// check if we have cached connections that we no longer need (when load on the table goes down) | |
nettyScheduler.startRecurrentTimer(new SchedulerJob() { | |
@Override | |
public void run() { | |
// if we're out of connections, cleanup the pool, and start over again | |
if (isOutOfConnections()) { | |
log.error("[HTablePool] is out of connections - cleaning up..."); | |
closeTablePool(); | |
} else { | |
resizeTablePool(); | |
} | |
} | |
}, RESIZE_TABLE_POOL_INTERVAL_SEC, RESIZE_TABLE_POOL_INTERVAL_SEC); | |
// push cached connections count time series | |
nettyScheduler.startRecurrentTimer(new SchedulerJob() { | |
@Override | |
public void run() { | |
cachedConnectionsHistogram.update(size()); | |
} | |
}, REPORT_CACHED_CONNECTIONS_COUNT_INTERVAL_SEC, REPORT_CACHED_CONNECTIONS_COUNT_INTERVAL_SEC); | |
} | |
/** | |
* Not thread safe - call in a synchronized context | |
*/ | |
private void incActiveConnectionsCount(String table) { | |
poolConnectionsHistogram.update(poolConnections.incrementAndGet()); | |
addConnectionsCount(table, 1); | |
} | |
/** | |
* Not thread safe - call in a synchronized context | |
*/ | |
private void decActiveConnectionsCount(String table) { | |
poolConnections.decrementAndGet(); | |
addConnectionsCount(table, -1); | |
} | |
private void addConnectionsCount(String table, Integer k) { | |
AtomicLong count = activeTableConnectionsCount.get(table); | |
if (count == null) { | |
activeTableConnectionsCount.putIfAbsent(table, new AtomicLong(0)); | |
count = activeTableConnectionsCount.get(table); | |
} | |
count.addAndGet(k); | |
Histogram activeTableConnectionsHistogram = activeTableConnectionsHistograms.get(table); | |
if (activeTableConnectionsHistogram == null) { | |
activeTableConnectionsHistograms.putIfAbsent(table, metricsRegistry.newHistogram(HTablePool.class, table, true)); | |
activeTableConnectionsHistogram = activeTableConnectionsHistograms.get(table); | |
} | |
activeTableConnectionsHistogram.update(count.get()); | |
} | |
/** | |
* Returns true, if we cannot open new connections anymore. In this case, all pooled connections should be | |
* immediately released, so that new connections can be opened | |
* Not thread safe. Must be called in a synchronized context | |
*/ | |
private boolean isOutOfConnections() { | |
return (poolConnections.get() + size()) >= maxConnectionsCount; | |
} | |
/** | |
* Get a reference to the specified table from the pool. | |
* <p/> | |
* <p/> | |
* Create a new one if one is not available. | |
* | |
* @param tableName table name | |
* @return a reference to the specified table | |
* @throws RuntimeException if there is a problem instantiating the HTable | |
*/ | |
public HTableInterface getTable(String tableName) { | |
LinkedList<HTableInterface> queue = tables.get(tableName); | |
try { | |
if (queue == null) { | |
tables.putIfAbsent(tableName, new LinkedList<HTableInterface>()); | |
return createHTable(tableName); | |
} | |
HTableInterface table; | |
synchronized (queue) { | |
table = queue.poll(); | |
} | |
if (table == null) { | |
return createHTable(tableName); | |
} | |
return table; | |
} finally { | |
incActiveConnectionsCount(tableName); | |
} | |
} | |
/** | |
* Returns mean table load for the last 5 minutes | |
*/ | |
private Integer getTableMeanLoad(String tableName) { | |
return (int) activeTableConnectionsHistograms.get(tableName).mean(); | |
} | |
/** | |
* Puts the specified HTable back into the pool. | |
* <p/> | |
* <p/> | |
* If the pool already contains <i>coreTablePoolSize</i> references to the table, then nothing happens. | |
* | |
* @param table table | |
*/ | |
public void putTable(HTableInterface table) { | |
String tableName = Bytes.toString(table.getTableName()); | |
LinkedList<HTableInterface> queue = tables.get(tableName); | |
synchronized (queue) { | |
// if the table is actively used at the moment, we cache the connections, instead of releasing them | |
if (queue.size() >= Math.max(coreTablePoolSize, | |
Math.min(maxTableConnectionsCount, getTableMeanLoad(tableName)))) { | |
// release table instance since we're not reusing it | |
releaseTable(tableName, table, true); | |
} else { | |
releaseTable(tableName, table, false); | |
queue.add(table); | |
} | |
} | |
} | |
/** | |
* Must be called to correctly release stale table connection | |
*/ | |
public void releaseTable(String tableName, HTableInterface table, boolean releaseConnection) { | |
try { | |
decActiveConnectionsCount(tableName); | |
if (releaseConnection) { | |
// release table instance since we're not reusing it | |
this.tableFactory.releaseHTableInterface(table); | |
} | |
} catch (Exception e) { | |
log.error("Failed to release table!", e); | |
} | |
} | |
protected HTableInterface createHTable(String tableName) { | |
return this.tableFactory.createHTableInterface(config, Bytes.toBytes(tableName)); | |
} | |
/** | |
* Removes pooled HTable connections, when table load goes down | |
*/ | |
private void resizeTablePool() { | |
log.info("[HTablePool] cleaning up unused connections..."); | |
for (String tableName : activeTableConnectionsCount.keySet()) { | |
Integer tableMeanLoad = getTableMeanLoad(tableName); | |
LinkedList<HTableInterface> queue = tables.get(tableName); | |
synchronized (queue) { | |
int tablePoolSize = queue.size(); | |
if (tablePoolSize > tableMeanLoad && tablePoolSize > coreTablePoolSize) { | |
log.info(String.format("[HTablePool] for [%s] is too big: resizing from %s to %s...", tableName, tablePoolSize, tableMeanLoad)); | |
int poolOversized = tablePoolSize - tableMeanLoad; | |
while (poolOversized > 0) { | |
HTableInterface table = queue.poll(); | |
this.tableFactory.releaseHTableInterface(table); | |
poolOversized--; | |
} | |
} | |
} | |
} | |
} | |
/** | |
* Releases all pooled connections | |
*/ | |
public void closeTablePool() { | |
int totalClosedTablesCount = 0; | |
for (String tableName : tables.keySet()) { | |
totalClosedTablesCount += closeTablePool(tableName, false); | |
} | |
log.info(String.format("[HTablePool] closed %s unused connections!", totalClosedTablesCount)); | |
} | |
/** | |
* Closes all the HTable instances , belonging to the given table, in the table pool. | |
* <p/> | |
* Note: this is a 'shutdown' of the given table pool and different from | |
* {@link #putTable(HTableInterface)}, that is used to return the table instance to the pool for future | |
* re-use. | |
* | |
* @param tableName | |
*/ | |
public int closeTablePool(final String tableName) { | |
return closeTablePool(tableName, true); | |
} | |
public int closeTablePool(final String tableName, boolean shouldCloseConnection) { | |
int closedTablesCount = 0; | |
Queue<HTableInterface> queue = tables.get(tableName); | |
if (queue != null) { | |
synchronized (queue) { | |
log.info(String.format("[HTablePool] %s has %s unused connections!", tableName, queue.size())); | |
HTableInterface table = queue.poll(); | |
while (table != null) { | |
this.tableFactory.releaseHTableInterface(table); | |
table = queue.poll(); | |
closedTablesCount++; | |
} | |
log.info(String.format("[HTablePool] closed %s connections to %s", tableName, closedTablesCount - queue.size())); | |
} | |
} | |
if (shouldCloseConnection) { | |
HConnectionManager.deleteConnection(this.config, true); | |
} | |
return closedTablesCount; | |
} | |
public int size(String tableName) { | |
Queue<HTableInterface> queue = tables.get(tableName); | |
return queue != null ? queue.size() : 0; | |
} | |
public int size() { | |
int poolSize = 0; | |
for (LinkedList<HTableInterface> tablePool : getTables().values()) { | |
poolSize += tablePool.size(); | |
} | |
return poolSize; | |
} | |
ConcurrentMap<String, LinkedList<HTableInterface>> getTables() { | |
return this.tables; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment