Skip to content

Instantly share code, notes, and snippets.

@remeniuk
Created October 4, 2012 15:11
Show Gist options
  • Save remeniuk/3834258 to your computer and use it in GitHub Desktop.
Save remeniuk/3834258 to your computer and use it in GitHub Desktop.
modified HTablePool
public class HTablePool {
private static final int RESIZE_TABLE_POOL_INTERVAL_SEC = (int) TimeUnit.MINUTES.toSeconds(30);
private static final int REPORT_CACHED_CONNECTIONS_COUNT_INTERVAL_SEC = (int) TimeUnit.MINUTES.toSeconds(2);
private static final Log log = LogFactory.getLog(HTablePool.class);
protected final ConcurrentMap<String, LinkedList<HTableInterface>> tables = new ConcurrentHashMap<String, LinkedList<HTableInterface>>();
private final Configuration config;
private final HTableInterfaceFactory tableFactory;
/**
* Current number of active table connections, opened through HTablePool (either taken from pool, or initiated
* from scratch)
* <p/>
* Required to generate time series on connections to different tables, and to estimate average load on the table,
* required for calculation of the pool size
* <p/>
* Not thread safe - call in a synchronized context
*/
private final ConcurrentMap<String, Histogram> activeTableConnectionsHistograms = new ConcurrentHashMap<String, Histogram>();
/**
* Histogram of active table connections, opened through HTablePool (either taken from pool, or initiated
* from scratch)
*/
private final ConcurrentMap<String, AtomicLong> activeTableConnectionsCount = new ConcurrentHashMap<String, AtomicLong>();
/**
* Total number of connections, taken from pool
*/
private final AtomicLong poolConnections = new AtomicLong(0);
@Resource(name = "scheduler")
private Scheduler nettyScheduler;
/**
* Histogram of connections, taken from pool
*/
@Resource(name = "poolConnectionsHistogram")
private Histogram poolConnectionsHistogram;
/**
* Histogram of idle connections, stored in cache
*/
@Resource(name = "cachedConnectionsHistogram")
private Histogram cachedConnectionsHistogram;
@Resource(name = "hBaseMetricsRegistry")
private MetricsRegistry metricsRegistry;
/**
* Maximum number of connections that can be opened through pool (both initiated from scratch and pooled)
* Must be significantly smaller than ZooKeeper connections count (not all connections are opened through
* HTablePool)
*/
private final int maxConnectionsCount;
/**
* Maximum size of the queue of connections to a specific table
*/
protected final int maxTableConnectionsCount;
/**
* Core/minimal size of the table pool
*/
protected final int coreTablePoolSize;
/**
* Default Constructor.
*/
protected HTablePool() {
this(null, Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
/**
* Constructor to set maximum versions and use the specified configuration.
*
* @param config configuration
* @param corePoolSize minimum number of references to keep for each table
*/
public HTablePool(final Configuration config, final int corePoolSize, final int maxConnectionsCount,
final int maxTableConnectionsCount) {
// Make a new configuration instance so I can safely coreTablePoolSize when
// done with the pool.
this.config = config == null ? null : new Configuration(config);
this.coreTablePoolSize = corePoolSize;
this.maxConnectionsCount = maxConnectionsCount;
this.maxTableConnectionsCount = maxTableConnectionsCount;
this.tableFactory = new HTableFactory();
}
@PostConstruct
private void init() {
// check if we have cached connections that we no longer need (when load on the table goes down)
nettyScheduler.startRecurrentTimer(new SchedulerJob() {
@Override
public void run() {
// if we're out of connections, cleanup the pool, and start over again
if (isOutOfConnections()) {
log.error("[HTablePool] is out of connections - cleaning up...");
closeTablePool();
} else {
resizeTablePool();
}
}
}, RESIZE_TABLE_POOL_INTERVAL_SEC, RESIZE_TABLE_POOL_INTERVAL_SEC);
// push cached connections count time series
nettyScheduler.startRecurrentTimer(new SchedulerJob() {
@Override
public void run() {
cachedConnectionsHistogram.update(size());
}
}, REPORT_CACHED_CONNECTIONS_COUNT_INTERVAL_SEC, REPORT_CACHED_CONNECTIONS_COUNT_INTERVAL_SEC);
}
/**
* Not thread safe - call in a synchronized context
*/
private void incActiveConnectionsCount(String table) {
poolConnectionsHistogram.update(poolConnections.incrementAndGet());
addConnectionsCount(table, 1);
}
/**
* Not thread safe - call in a synchronized context
*/
private void decActiveConnectionsCount(String table) {
poolConnections.decrementAndGet();
addConnectionsCount(table, -1);
}
private void addConnectionsCount(String table, Integer k) {
AtomicLong count = activeTableConnectionsCount.get(table);
if (count == null) {
activeTableConnectionsCount.putIfAbsent(table, new AtomicLong(0));
count = activeTableConnectionsCount.get(table);
}
count.addAndGet(k);
Histogram activeTableConnectionsHistogram = activeTableConnectionsHistograms.get(table);
if (activeTableConnectionsHistogram == null) {
activeTableConnectionsHistograms.putIfAbsent(table, metricsRegistry.newHistogram(HTablePool.class, table, true));
activeTableConnectionsHistogram = activeTableConnectionsHistograms.get(table);
}
activeTableConnectionsHistogram.update(count.get());
}
/**
* Returns true, if we cannot open new connections anymore. In this case, all pooled connections should be
* immediately released, so that new connections can be opened
* Not thread safe. Must be called in a synchronized context
*/
private boolean isOutOfConnections() {
return (poolConnections.get() + size()) >= maxConnectionsCount;
}
/**
* Get a reference to the specified table from the pool.
* <p/>
* <p/>
* Create a new one if one is not available.
*
* @param tableName table name
* @return a reference to the specified table
* @throws RuntimeException if there is a problem instantiating the HTable
*/
public HTableInterface getTable(String tableName) {
LinkedList<HTableInterface> queue = tables.get(tableName);
try {
if (queue == null) {
tables.putIfAbsent(tableName, new LinkedList<HTableInterface>());
return createHTable(tableName);
}
HTableInterface table;
synchronized (queue) {
table = queue.poll();
}
if (table == null) {
return createHTable(tableName);
}
return table;
} finally {
incActiveConnectionsCount(tableName);
}
}
/**
* Returns mean table load for the last 5 minutes
*/
private Integer getTableMeanLoad(String tableName) {
return (int) activeTableConnectionsHistograms.get(tableName).mean();
}
/**
* Puts the specified HTable back into the pool.
* <p/>
* <p/>
* If the pool already contains <i>coreTablePoolSize</i> references to the table, then nothing happens.
*
* @param table table
*/
public void putTable(HTableInterface table) {
String tableName = Bytes.toString(table.getTableName());
LinkedList<HTableInterface> queue = tables.get(tableName);
synchronized (queue) {
// if the table is actively used at the moment, we cache the connections, instead of releasing them
if (queue.size() >= Math.max(coreTablePoolSize,
Math.min(maxTableConnectionsCount, getTableMeanLoad(tableName)))) {
// release table instance since we're not reusing it
releaseTable(tableName, table, true);
} else {
releaseTable(tableName, table, false);
queue.add(table);
}
}
}
/**
* Must be called to correctly release stale table connection
*/
public void releaseTable(String tableName, HTableInterface table, boolean releaseConnection) {
try {
decActiveConnectionsCount(tableName);
if (releaseConnection) {
// release table instance since we're not reusing it
this.tableFactory.releaseHTableInterface(table);
}
} catch (Exception e) {
log.error("Failed to release table!", e);
}
}
protected HTableInterface createHTable(String tableName) {
return this.tableFactory.createHTableInterface(config, Bytes.toBytes(tableName));
}
/**
* Removes pooled HTable connections, when table load goes down
*/
private void resizeTablePool() {
log.info("[HTablePool] cleaning up unused connections...");
for (String tableName : activeTableConnectionsCount.keySet()) {
Integer tableMeanLoad = getTableMeanLoad(tableName);
LinkedList<HTableInterface> queue = tables.get(tableName);
synchronized (queue) {
int tablePoolSize = queue.size();
if (tablePoolSize > tableMeanLoad && tablePoolSize > coreTablePoolSize) {
log.info(String.format("[HTablePool] for [%s] is too big: resizing from %s to %s...", tableName, tablePoolSize, tableMeanLoad));
int poolOversized = tablePoolSize - tableMeanLoad;
while (poolOversized > 0) {
HTableInterface table = queue.poll();
this.tableFactory.releaseHTableInterface(table);
poolOversized--;
}
}
}
}
}
/**
* Releases all pooled connections
*/
public void closeTablePool() {
int totalClosedTablesCount = 0;
for (String tableName : tables.keySet()) {
totalClosedTablesCount += closeTablePool(tableName, false);
}
log.info(String.format("[HTablePool] closed %s unused connections!", totalClosedTablesCount));
}
/**
* Closes all the HTable instances , belonging to the given table, in the table pool.
* <p/>
* Note: this is a 'shutdown' of the given table pool and different from
* {@link #putTable(HTableInterface)}, that is used to return the table instance to the pool for future
* re-use.
*
* @param tableName
*/
public int closeTablePool(final String tableName) {
return closeTablePool(tableName, true);
}
public int closeTablePool(final String tableName, boolean shouldCloseConnection) {
int closedTablesCount = 0;
Queue<HTableInterface> queue = tables.get(tableName);
if (queue != null) {
synchronized (queue) {
log.info(String.format("[HTablePool] %s has %s unused connections!", tableName, queue.size()));
HTableInterface table = queue.poll();
while (table != null) {
this.tableFactory.releaseHTableInterface(table);
table = queue.poll();
closedTablesCount++;
}
log.info(String.format("[HTablePool] closed %s connections to %s", tableName, closedTablesCount - queue.size()));
}
}
if (shouldCloseConnection) {
HConnectionManager.deleteConnection(this.config, true);
}
return closedTablesCount;
}
public int size(String tableName) {
Queue<HTableInterface> queue = tables.get(tableName);
return queue != null ? queue.size() : 0;
}
public int size() {
int poolSize = 0;
for (LinkedList<HTableInterface> tablePool : getTables().values()) {
poolSize += tablePool.size();
}
return poolSize;
}
ConcurrentMap<String, LinkedList<HTableInterface>> getTables() {
return this.tables;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment