Skip to content

Instantly share code, notes, and snippets.

@darrenfu
Last active April 16, 2019 23:29
Show Gist options
  • Save darrenfu/0f5a81e837f6b6cea0a8b532c55c7c1b to your computer and use it in GitHub Desktop.
Save darrenfu/0f5a81e837f6b6cea0a8b532c55c7c1b to your computer and use it in GitHub Desktop.
// Instead of adding argument --files /apache/hbase/conf/hbase-site.xml, just copy `hbase-site.xml` to /apache/spark/conf/
//spark-shell --executor-memory 20g --executor-cores 1 --num-executors 10 --driver-memory 8g --queue $HADOOP_QUEUE --jars $(for x in `ls -1 /apache/hbase/lib/*.jar`; do readlink -f $x; done | paste -s | sed -e 's/\t/,/g')
import java.io.{DataOutputStream, ByteArrayOutputStream}
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
def convertScanToString(scan: Scan): String = {
val proto = ProtobufUtil.toScan(scan)
return Base64.encodeBytes(proto.toByteArray())
}
val conf = HBaseConfiguration.create()
//conf.set("hbase.zookeeper.quorum", "hostname1:2181,hostname2:2181,...")
val scan = new Scan()
scan.setCaching(500)
scan.setCacheBlocks(false)
val tables = Array(
...
)
tables.foreach(t => {
conf.set(TableInputFormat.INPUT_TABLE, t)
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val rdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
print(s"${t} -> ${rdd.count}")
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment