Last active
May 7, 2018 22:17
-
-
Save timotta/28f8d78634df0a7c54f61fc34c00563e to your computer and use it in GitHub Desktop.
HBaseRegions with the method `online` that returns only the regions that are returning ok
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import org.apache.hadoop.hbase.HRegionInfo | |
| import org.apache.hadoop.hbase.HBaseConfiguration | |
| import org.apache.hadoop.hbase.HConstants | |
| import org.apache.hadoop.hbase.client.ConnectionFactory | |
| import org.apache.hadoop.hbase.util.Bytes | |
| import org.apache.hadoop.hbase.filter.PrefixFilter | |
| import org.apache.hadoop.hbase.TableName | |
| import org.apache.hadoop.hbase.client.Scan | |
| import scala.util.Try | |
| import scala.collection.JavaConverters._ | |
| import org.apache.hadoop.hbase.client.Get | |
| class HBaseRegions(zookeeper: String, znode: String) extends Serializable { | |
| /** | |
| * Return only the regions that are returning without error from hbase | |
| */ | |
| def online(table: String, family: String): Array[HRegionInfo] = { | |
| val regions = tableRegions(table) | |
| println(s"HBaseRegions: found ${regions.size} regions for $table") | |
| val online = filterOnlines(regions, table, family) | |
| println(s"HBaseRegions: only ${online.size} are ok $table") | |
| online | |
| } | |
| private def tableRegions(table: String) = { | |
| val familyMeta = Bytes.toBytes("info") | |
| val columnMeta = Bytes.toBytes("regioninfo") | |
| val scan = new Scan() | |
| .setStartRow(Bytes.toBytes(table + ",")) | |
| .setStopRow(Bytes.toBytes(table + "Z")) | |
| .addColumn(familyMeta, columnMeta) | |
| .setFilter(new PrefixFilter(Bytes.toBytes(table + ","))) | |
| val connection = connectionToScan() | |
| try { | |
| val metaTable = connection.getTable(TableName.valueOf("hbase:meta")) | |
| val scanner = metaTable.getScanner(scan) | |
| scanner.asScala.map { result => | |
| val region = Bytes.toStringBinary(result.getRow) | |
| val info = HRegionInfo.parseFrom(result.getValue(familyMeta, columnMeta)) | |
| (region, info) | |
| }.map(_._2) | |
| .toArray | |
| } finally { | |
| Try(connection.close()) | |
| } | |
| } | |
| private def filterOnlines(regions: Array[HRegionInfo], table: String, family: String) = { | |
| val familyTest = Bytes.toBytes(family) | |
| val columnTest = Bytes.toBytes("test") | |
| val connection = connectionToGet() | |
| try { | |
| regions.filter { info => | |
| val key = if (info.getStartKey().size > 0) info.getStartKey() else info.getEndKey() | |
| val get = new Get(key) | |
| get.addColumn(familyTest, columnTest) | |
| val table = connection.getTable(info.getTable) | |
| Try(table.get(get)).isSuccess | |
| } | |
| } finally { | |
| Try(connection.close()) | |
| } | |
| } | |
| private def connectionToScan() = { | |
| val hbaseConf = HBaseConfiguration.create() | |
| hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeper) | |
| hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znode) | |
| hbaseConf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "600000") | |
| hbaseConf.set(HConstants.HBASE_RPC_TIMEOUT_KEY, "3600000") | |
| hbaseConf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "600000") | |
| ConnectionFactory.createConnection(hbaseConf) | |
| } | |
| private def connectionToGet() = { | |
| val hbaseConf = HBaseConfiguration.create() | |
| hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeper); | |
| hbaseConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znode); | |
| hbaseConf.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "1000"); | |
| hbaseConf.set(HConstants.HBASE_CLIENT_PAUSE, "100"); | |
| hbaseConf.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); | |
| ConnectionFactory.createConnection(hbaseConf) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Using it to save on hbase with spark without breaking in some bad regions: