Skip to content

Instantly share code, notes, and snippets.

@timotta
Last active May 7, 2018 22:17
Show Gist options
  • Save timotta/28f8d78634df0a7c54f61fc34c00563e to your computer and use it in GitHub Desktop.
Save timotta/28f8d78634df0a7c54f61fc34c00563e to your computer and use it in GitHub Desktop.
HBaseRegions with the method `online` that returns only the regions that are returning ok
import org.apache.hadoop.hbase.HRegionInfo
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.HConstants
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.filter.PrefixFilter
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Scan
import scala.util.Try
import scala.collection.JavaConverters._
import org.apache.hadoop.hbase.client.Get
class HBaseRegions(zookeeper: String, znode: String) extends Serializable {
/**
* Return only the regions that are returning without error from hbase
*/
def online(table: String, family: String): Array[HRegionInfo] = {
val regions = tableRegions(table)
println(s"HBaseRegions: found ${regions.size} regions for $table")
val online = filterOnlines(regions, table, family)
println(s"HBaseRegions: only ${online.size} are ok $table")
online
}
private def tableRegions(table: String) = {
val familyMeta = Bytes.toBytes("info")
val columnMeta = Bytes.toBytes("regioninfo")
val scan = new Scan()
.setStartRow(Bytes.toBytes(table + ","))
.setStopRow(Bytes.toBytes(table + "Z"))
.addColumn(familyMeta, columnMeta)
.setFilter(new PrefixFilter(Bytes.toBytes(table + ",")))
val connection = connectionToScan()
try {
val metaTable = connection.getTable(TableName.valueOf("hbase:meta"))
val scanner = metaTable.getScanner(scan)
scanner.asScala.map { result =>
val region = Bytes.toStringBinary(result.getRow)
val info = HRegionInfo.parseFrom(result.getValue(familyMeta, columnMeta))
(region, info)
}.map(_._2)
.toArray
} finally {
Try(connection.close())
}
}
private def filterOnlines(regions: Array[HRegionInfo], table: String, family: String) = {
val familyTest = Bytes.toBytes(family)
val columnTest = Bytes.toBytes("test")
val connection = connectionToGet()
try {
regions.filter { info =>
val key = if (info.getStartKey().size > 0) info.getStartKey() else info.getEndKey()
val get = new Get(key)
get.addColumn(familyTest, columnTest)
val table = connection.getTable(info.getTable)
Try(table.get(get)).isSuccess
}
} finally {
Try(connection.close())
}
}
private def connectionToScan() = {
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeper)
hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znode)
hbaseConf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "600000")
hbaseConf.set(HConstants.HBASE_RPC_TIMEOUT_KEY, "3600000")
hbaseConf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "600000")
ConnectionFactory.createConnection(hbaseConf)
}
private def connectionToGet() = {
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeper);
hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znode);
hbaseConf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "1000");
hbaseConf.set(HConstants.HBASE_CLIENT_PAUSE, "100");
hbaseConf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
ConnectionFactory.createConnection(hbaseConf)
}
}
@timotta
Copy link
Author

timotta commented May 7, 2018

Using it to save on hbase with spark without breaking in some bad regions:

    val regions = new HBaseRegions(zookeeper, znode)
    val onlineRegions = regions.online(table, family)
    val onlineRegionsBroad = sc.broadcast(onlineRegions)

    val toSave = values.filter {
      case (_, p) =>  onlineRegionsBroad.value.find { h => h.containsRow(p.getRow()) }.isDefined
    }

   values.saveAsNewAPIHadoopDataset(hbaseConf)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment