Last active
August 3, 2016 14:21
-
-
Save AtlasPilotPuppy/2353b5da5099e24da111 to your computer and use it in GitHub Desktop.
Accessing Hbase from Apache Spark
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.rdd.NewHadoopRDD | |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat | |
import org.apache.hadoop.hbase.HBaseConfiguration | |
import org.apache.hadoop.hbase.client.Result | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable | |
import scala.collection.JavaConversions._ | |
import scala.collection.JavaConverters._ | |
val sc = new SparkContext("local", "Simple App") | |
val hbaseConfiguration = (tableName: String) => { | |
val hbaseConfiguration = HBaseConfiguration.create() | |
hbaseConfiguration.set(TableInputFormat.INPUT_TABLE, tableName) | |
hbaseConfiguration | |
} | |
val tableRDD = (table: String) => { | |
val rdd = new NewHadoopRDD( | |
sc, | |
classOf[TableInputFormat], | |
classOf[ImmutableBytesWritable], | |
classOf[Result], | |
hbaseConfiguration(table) | |
) | |
rdd | |
} | |
val rdd = tableRDD("table-with-data") | |
/** Convert columns to strings **/ | |
val columns = rdd.map(tuple => tuple._2).map(result => result.getColumn("Column Family".getBytes(), | |
"ColumnQualifier".getBytes())).map(keyValues => { | |
new String(keyValues.asScala.reduceLeft{ | |
(a,b) => if (a.getTimestamp > b.getTimestamp) a else b | |
}.getValue.map(_.toChar))}) | |
/** another way to get multiple columns */ | |
val cols = rdd.map(tuple => tuple._2).map(result => result.getColumn("CF".getBytes, "CQ1".getBytes) :: result.getColumn("CF".getBytes, "CQ2".getBytes) :: result.getColumn("CF".getBytes, "CQ2".getBytes):: Nil) | |
// remove invalid items from rdd | |
val filtered = cols.filter( row.map(_.length > 0).reduce((acc, tip) => acc & tip) ) | |
/** convert all values to strings **/ | |
val row_vals = filtered.map(row => row.map(ele => new String(ele.head.getValue.map(_.toChar)))) |
@rain1024
I am reading from HBase on the same cluster so reading directly from HDFS
How would you do the same in python with pyspark?
I have written this piece of code but it does not work
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
data_conf = {
"hbase.mapreduce.inputtable": "raw_signals",
"hbase.mapreduce.scan.columns": "family1:col1 family2:col2",
}
hbase_rdd = sc.newAPIHadoopRDD(
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter=keyConv,
valueConverter=valueConv,
conf=data_conf)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
where did your hbase address? on local or on remote server?