Created
June 4, 2015 10:35
-
-
Save akhld/a4e46bdc832218486acb to your computer and use it in GitHub Desktop.
Spark Streaming with HBase
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val push_hbase = aggregatedStream.transform(rdd => { | |
val hbaseTableName = "global_aggregate" | |
val hbaseColumnName = "aggregate" | |
//Creates the HBase confs | |
val hconf = HBaseConfiguration.create() | |
hconf.set("hbase.zookeeper.quorum", "sigmoid-machine1,sigmoid-machine2,sigmoid-machine3,sigmoid-machine4") | |
hconf.set("hbase.zookeeper.property.clientPort", "2181") | |
hconf.set("hbase.defaults.for.version.skip", "true") | |
hconf.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName) | |
hconf.setClass("mapreduce.job.outputformat.class", classOf[TableOutputFormat[String]], classOf[OutputFormat[String, Mutation]]) | |
val admin = new HBaseAdmin(hconf) | |
//If table already exists, then lets read back and update it | |
if (admin.isTableAvailable(hbaseTableName)) { | |
hconf.set(TableInputFormat.INPUT_TABLE, hbaseTableName) | |
hconf.set(TableInputFormat.SCAN_COLUMNS, "CF:" + hbaseColumnName + " CF:batch") | |
val check = ssc.sparkContext.newAPIHadoopRDD(hconf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).map { | |
case (key, row) => { | |
val rowkey = Bytes.toString(key.get()) | |
val valu = Bytes.toString(row.getValue(Bytes.toBytes("CF"), Bytes.toBytes(hbaseColumnName))) | |
(rowkey, valu.toInt) | |
} | |
} | |
//Lets union the stream data with previous hbase data. | |
val jdata = (check ++ rdd).reduceByKey(_ + _) | |
jdata.map(valu => (new ImmutableBytesWritable, { | |
val record = new Put(Bytes.toBytes(valu._1)) | |
record.add(Bytes.toBytes("CF"), Bytes.toBytes(hbaseColumnName), Bytes.toBytes(valu._2.toString)) | |
record | |
} | |
) | |
).saveAsNewAPIHadoopDataset(hconf) | |
jdata | |
//Let’s create the table and push the data into it. | |
}else{ | |
val tab = TableName.valueOf(hbaseTableName) | |
val tabledesc = new HTableDescriptor(tab) | |
tabledesc.addFamily(new HColumnDescriptor("CF")) | |
admin.createTable(tabledesc) | |
rdd.map(valu => (new ImmutableBytesWritable, { | |
val record = new Put(Bytes.toBytes(valu._1)) | |
record.add(Bytes.toBytes("CF"), Bytes.toBytes(hbaseColumnName), Bytes.toBytes(valu._2.toString)) | |
record | |
} | |
) | |
).saveAsNewAPIHadoopDataset(hconf) | |
rdd | |
} | |
}) | |
push_hbase.count().print() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment