Last active
March 2, 2019 21:38
-
-
Save chetkhatri/8261ccba21c517cdf2ffef7281cc51c3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.chetan.poc.hbase | |
/** | |
* Created by chetan on 24/1/17. | |
*/ | |
import org.apache.spark._ | |
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, TableName} | |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat | |
import org.apache.hadoop.hbase.util.Bytes | |
import org.apache.hadoop.hbase.client._ | |
import org.apache.hadoop.hbase.client.ConnectionFactory | |
import scala.collection.JavaConverters._ | |
object IncrementalJob { | |
val APP_NAME: String = "SparkHbaseJob" | |
var HBASE_DB_HOST: String = null | |
var HBASE_TABLE: String = null | |
var HBASE_COLUMN_FAMILY: String = null | |
def main(args: Array[String]) { | |
// Initializing HBASE Configuration variables | |
HBASE_DB_HOST="127.0.0.1" | |
HBASE_TABLE="university" | |
HBASE_COLUMN_FAMILY="emp" | |
// setting spark application | |
val sparkConf = new SparkConf().setAppName(APP_NAME) | |
//initialize the spark context | |
val sparkContext = new SparkContext(sparkConf) | |
val conf = HBaseConfiguration.create() | |
conf.set(TableInputFormat.INPUT_TABLE, HBASE_TABLE) | |
val connection = ConnectionFactory.createConnection(conf) | |
val table = connection.getTable(TableName.valueOf(Bytes.toBytes(HBASE_TABLE))) | |
def printRow(result : Result) = { | |
val cells = result.rawCells(); | |
print( Bytes.toString(result.getRow) + " : " ) | |
for(cell <- cells){ | |
val col_name = Bytes.toString(CellUtil.cloneQualifier(cell)) | |
val col_value = Bytes.toString(CellUtil.cloneValue(cell)) | |
val col_timestamp = cell.getTimestamp() | |
print("(%s,%s,%s) ".format(col_name, col_value, col_timestamp)) | |
} | |
println() | |
} | |
var get = new Get(Bytes.toBytes("cs01")) | |
var result = table.get(get) | |
printRow(result) | |
// Scan - HBase Table | |
val scan = table.getScanner(new Scan()) | |
scan.asScala.foreach(result => { | |
printRow(result) | |
}) | |
table.close() | |
connection.close() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi, can you drop the pom file for this too?