Created
March 16, 2011 22:13
-
-
Save dvryaboy/873429 to your computer and use it in GitHub Desktop.
HBase Alerter thing.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.twitter.twadoop.monitoring; | |
import java.io.IOException; | |
import java.util.Collection; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.Future; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.TimeoutException; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.HServerInfo; | |
import org.apache.hadoop.hbase.MasterNotRunningException; | |
import org.apache.hadoop.hbase.ZooKeeperConnectionException; | |
import org.apache.hadoop.hbase.client.HBaseAdmin; | |
import org.apache.hadoop.hbase.client.HConnection; | |
import org.apache.hadoop.hbase.client.HConnectionManager; | |
import org.apache.hadoop.hbase.ipc.HMasterInterface; | |
import org.apache.hadoop.hbase.ipc.HRegionInterface; | |
/** | |
* <p>Connect to HBase Master, fetch all region servers, try to | |
* do something on the RSes to ensure they are actually responding. | |
* </p> | |
* | |
*/ | |
// TODO: Make this pick up non-default hbase | |
// TODO: Make timeouts and concurrency level configurable | |
// TODO: Add retries (something like 1 failure is a warning, 3 failures is critical) | |
public class HBaseRSAlerter { | |
final HMasterInterface hMaster; | |
final HBaseAdmin hbaseAdmin; | |
final Configuration conf; | |
static final long TIMEOUT_MS = 5000; | |
static final int CONC_LEVEL = 3; | |
// NOTE: order matters. It corresponds to Nagios status values. | |
enum STATUS { | |
OK, WARNING, CRITICAL | |
} | |
public HBaseRSAlerter(Configuration conf) throws MasterNotRunningException, ZooKeeperConnectionException { | |
this.conf = HBaseConfiguration.addHbaseResources(conf); | |
this.hbaseAdmin = new HBaseAdmin(conf); | |
this.hMaster = hbaseAdmin.getMaster(); | |
} | |
public Map<HServerInfo, Future<STATUS>> checkLiveness() { | |
HConnection conn = null; | |
Map<HServerInfo, Future<STATUS>> statusMap = new HashMap<HServerInfo, Future<STATUS>>(); | |
try { | |
conn = HConnectionManager.getConnection(conf); | |
} catch (ZooKeeperConnectionException e) { | |
System.out.println("Unable to get ZooKeeper connection"); | |
e.printStackTrace(); | |
return statusMap; | |
} | |
Collection<HServerInfo> serverInfos = hMaster.getClusterStatus().getServerInfo(); | |
ExecutorService executor = Executors.newFixedThreadPool(CONC_LEVEL); | |
for (HServerInfo serverInfo : serverInfos) { | |
HRegionInterface hReg = null; | |
try { | |
hReg = conn.getHRegionConnection(serverInfo.getServerAddress()); | |
} catch (IOException e) { | |
System.out.println("Unable to get HRegionConnection to " + serverInfo.getHostnamePort()); | |
statusMap.put(serverInfo, new PrescientFuture<STATUS>(STATUS.WARNING)); | |
continue; | |
} | |
LivenessCheck check = new LivenessCheck(hReg); | |
Future<STATUS> future = executor.submit(check); | |
statusMap.put(serverInfo, future); | |
} | |
return statusMap; | |
} | |
public STATUS run() { | |
if (!hMaster.isMasterRunning()) { | |
System.out.println("Master not running."); | |
return STATUS.CRITICAL; | |
} | |
// overallStatus is max of all statuses. | |
STATUS overallStatus = STATUS.OK; | |
Map<HServerInfo, Future<STATUS>> rsStats = checkLiveness(); | |
if (rsStats.isEmpty()) { | |
return STATUS.CRITICAL; | |
} | |
for (Map.Entry<HServerInfo, Future<STATUS>> entry : rsStats.entrySet()) { | |
STATUS status = STATUS.WARNING; | |
try { | |
status = entry.getValue().get(); | |
if (status != STATUS.OK) { | |
System.out.println(status.toString() + " contacting " + entry.getKey().getHostnamePort()); | |
} | |
} catch (InterruptedException e) { | |
System.out.println("InterruptedException retrieving value for " + entry.getKey().getHostnamePort()); | |
e.printStackTrace(); | |
} catch (ExecutionException e) { | |
System.out.println("ExecutionException retrieving value for " + entry.getKey().getHostnamePort()); | |
e.printStackTrace(); | |
} | |
overallStatus = STATUS.values()[Math.max(overallStatus.ordinal(), status.ordinal())]; | |
} | |
return STATUS.OK; | |
} | |
/** | |
* <p>Perform a liveness check on the given Region Server (currently simply call getOnlineRegions). | |
* If the task times out, return WARNING, otherwise return OK.</p> | |
* <p>We could instead use the built-in timeout capability of Future.get(long, TimeUnit) but the problem there | |
* is that we would be timing out Futures that may not have had a chance to start executing yet, as other, slow | |
* tasks clogged up the executor threads. </p> | |
*/ | |
public static class LivenessCheck implements Callable<STATUS> { | |
final HRegionInterface hReg; | |
public LivenessCheck(HRegionInterface hReg) { | |
this.hReg = hReg; | |
} | |
@Override | |
public STATUS call() { | |
Thread t = new Thread(new Runnable() { | |
@Override | |
public void run() { | |
hReg.getOnlineRegions(); | |
} | |
}); | |
t.setDaemon(true); | |
t.start(); | |
try { | |
t.join(TIMEOUT_MS); | |
} catch (InterruptedException e) {} | |
if (t.isAlive()) { | |
t.interrupt(); | |
return STATUS.WARNING; | |
} | |
return STATUS.OK; | |
} | |
} | |
/** | |
* <p>A mock Future that knows what to return from the get-go.</p> | |
* @param <T> intended result. | |
*/ | |
public static class PrescientFuture<T> implements Future<T> { | |
final T result; | |
public PrescientFuture(T result) { | |
this.result = result; | |
} | |
@Override | |
public boolean cancel(boolean mayInterrupt) { | |
return true; | |
} | |
@Override | |
public T get() throws InterruptedException, ExecutionException { | |
return result; | |
} | |
@Override | |
public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, | |
TimeoutException { | |
return result; | |
} | |
@Override | |
public boolean isCancelled() { | |
return false; | |
} | |
@Override | |
public boolean isDone() { | |
return true; | |
} | |
} | |
public static void main(String[] args) { | |
HBaseRSAlerter alerter = null; | |
try { | |
alerter = new HBaseRSAlerter(new Configuration()); | |
} catch (MasterNotRunningException e) { | |
System.out.println("Unable to connect to Master: MasterNotRunning"); | |
e.printStackTrace(); | |
System.exit(STATUS.CRITICAL.ordinal()); | |
} catch (ZooKeeperConnectionException e) { | |
System.out.println("Unable to connect to Master: ZooKeeperConnectionException"); | |
e.printStackTrace(); | |
System.exit(STATUS.CRITICAL.ordinal()); | |
} | |
System.exit(alerter.run().ordinal()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment