Skip to content

Instantly share code, notes, and snippets.

@AtlasPilotPuppy
Last active August 3, 2016 14:21
Show Gist options
  • Save AtlasPilotPuppy/2353b5da5099e24da111 to your computer and use it in GitHub Desktop.
Save AtlasPilotPuppy/2353b5da5099e24da111 to your computer and use it in GitHub Desktop.
Accessing Hbase from Apache Spark
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
val sc = new SparkContext("local", "Simple App")
val hbaseConfiguration = (tableName: String) => {
val hbaseConfiguration = HBaseConfiguration.create()
hbaseConfiguration.set(TableInputFormat.INPUT_TABLE, tableName)
hbaseConfiguration
}
val tableRDD = (table: String) => {
val rdd = new NewHadoopRDD(
sc,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result],
hbaseConfiguration(table)
)
rdd
}
val rdd = tableRDD("table-with-data")
/** Convert columns to strings **/
val columns = rdd.map(tuple => tuple._2).map(result => result.getColumn("Column Family".getBytes(),
"ColumnQualifier".getBytes())).map(keyValues => {
new String(keyValues.asScala.reduceLeft{
(a,b) => if (a.getTimestamp > b.getTimestamp) a else b
}.getValue.map(_.toChar))})
/** another way to get multiple columns */
val cols = rdd.map(tuple => tuple._2).map(result => result.getColumn("CF".getBytes, "CQ1".getBytes) :: result.getColumn("CF".getBytes, "CQ2".getBytes) :: result.getColumn("CF".getBytes, "CQ2".getBytes):: Nil)
// remove invalid items from rdd
val filtered = cols.filter( row.map(_.length > 0).reduce((acc, tip) => acc & tip) )
/** convert all values to strings **/
val row_vals = filtered.map(row => row.map(ele => new String(ele.head.getValue.map(_.toChar))))
@rain1024
Copy link

where did your hbase address? on local or on remote server?

@AtlasPilotPuppy
Copy link
Author

@rain1024
I am reading from HBase on the same cluster so reading directly from HDFS

@lnicalo
Copy link

lnicalo commented Jul 6, 2015

How would you do the same in python with pyspark?

I have written this piece of code but it does not work

keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

data_conf = {
         "hbase.mapreduce.inputtable": "raw_signals",
        "hbase.mapreduce.scan.columns": "family1:col1 family2:col2",
        }

hbase_rdd = sc.newAPIHadoopRDD(
        "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
        "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
        "org.apache.hadoop.hbase.client.Result",
        keyConverter=keyConv,
        valueConverter=valueConv,
        conf=data_conf)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment