-
-
Save AllenFang/0d57aa6edcbb9b76cad5a9037f557f0d to your computer and use it in GitHub Desktop.
Spark 下 操作 HBase 1.0.0 新版API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.hbase.util.Bytes | |
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName, HBaseConfiguration} | |
import org.apache.hadoop.hbase.client._ | |
import org.apache.spark.SparkContext | |
import scala.collection.JavaConversions._ | |
/** | |
* HBase 1.0.0 新版API, CRUD 的基本操作代码示例 | |
**/ | |
object HBaseNewAPI { | |
def main(args: Array[String]) { | |
val sc = new SparkContext("local", "SparkHBase") | |
val conf = HBaseConfiguration.create() | |
conf.set("hbase.zookeeper.property.clientPort", "2181") | |
conf.set("hbase.zookeeper.quorum", "master") | |
//Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口 | |
val conn = ConnectionFactory.createConnection(conf) | |
//从Connection获得 Admin 对象(相当于以前的 HAdmin) | |
val admin = conn.getAdmin | |
//本例将操作的表名 | |
val userTable = TableName.valueOf("user") | |
//创建 user 表 | |
val tableDescr = new HTableDescriptor(userTable) | |
tableDescr.addFamily(new HColumnDescriptor("basic".getBytes)) | |
println("Creating table `user`. ") | |
if (admin.tableExists(userTable)) { | |
admin.disableTable(userTable) | |
admin.deleteTable(userTable) | |
} | |
admin.createTable(tableDescr) | |
println("Done!") | |
try{ | |
//获取 user 表 | |
val table = conn.getTable(userTable) | |
try{ | |
//准备插入一条 key 为 id001 的数据 | |
val p = new Put("id001".getBytes) | |
//为put操作指定 column 和 value (以前的 put.add 方法被弃用了) | |
p.addColumn("basic".getBytes,"name".getBytes, "wuchong".getBytes) | |
//提交 | |
table.put(p) | |
//查询某条数据 | |
val g = new Get("id001".getBytes) | |
val result = table.get(g) | |
val value = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes)) | |
println("GET id001 :"+value) | |
//扫描数据 | |
val s = new Scan() | |
s.addColumn("basic".getBytes,"name".getBytes) | |
val scanner = table.getScanner(s) | |
try{ | |
for(r <- scanner){ | |
println("Found row: "+r) | |
println("Found value: "+Bytes.toString(r.getValue("basic".getBytes,"name".getBytes))) | |
} | |
}finally { | |
//确保scanner关闭 | |
scanner.close() | |
} | |
//删除某条数据,操作方式与 Put 类似 | |
val d = new Delete("id001".getBytes) | |
d.addColumn("basic".getBytes,"name".getBytes) | |
table.delete(d) | |
}finally { | |
if(table != null) table.close() | |
} | |
}finally { | |
conn.close() | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.hbase.client.Put | |
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp | |
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable | |
import org.apache.hadoop.hbase.mapred.TableOutputFormat | |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat | |
import org.apache.hadoop.hbase.protobuf.ProtobufUtil | |
import org.apache.hadoop.hbase.util.{Base64, Bytes} | |
import org.apache.hadoop.hbase.HBaseConfiguration | |
import org.apache.hadoop.mapred.JobConf | |
import org.apache.spark.SparkContext | |
import org.apache.hadoop.hbase.client._ | |
/** | |
* Spark 读取和写入 HBase | |
**/ | |
object SparkOnHBase { | |
def convertScanToString(scan: Scan) = { | |
val proto = ProtobufUtil.toScan(scan) | |
Base64.encodeBytes(proto.toByteArray) | |
} | |
def main(args: Array[String]) { | |
val sc = new SparkContext("local","SparkOnHBase") | |
val conf = HBaseConfiguration.create() | |
conf.set("hbase.zookeeper.property.clientPort", "2181") | |
conf.set("hbase.zookeeper.quorum", "master") | |
// ======Save RDD to HBase======== | |
// step 1: JobConf setup | |
val jobConf = new JobConf(conf,this.getClass) | |
jobConf.setOutputFormat(classOf[TableOutputFormat]) | |
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"user") | |
// step 2: rdd mapping to table | |
// 在 HBase 中表的 schema 一般是这样的 | |
// *row cf:col_1 cf:col_2 | |
// 而在Spark中,我们操作的是RDD元组,比如(1,"lilei",14) , (2,"hanmei",18) | |
// 我们需要将 *RDD[(uid:Int, name:String, age:Int)]* 转换成 *RDD[(ImmutableBytesWritable, Put)]* | |
// 我们定义了 convert 函数做这个转换工作 | |
def convert(triple: (Int, String, Int)) = { | |
val p = new Put(Bytes.toBytes(triple._1)) | |
p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("name"),Bytes.toBytes(triple._2)) | |
p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("age"),Bytes.toBytes(triple._3)) | |
(new ImmutableBytesWritable, p) | |
} | |
// step 3: read RDD data from somewhere and convert | |
val rawData = List((1,"lilei",14), (2,"hanmei",18), (3,"someone",38)) | |
val localData = sc.parallelize(rawData).map(convert) | |
//step 4: use `saveAsHadoopDataset` to save RDD to HBase | |
localData.saveAsHadoopDataset(jobConf) | |
// ================================= | |
// ======Load RDD from HBase======== | |
// use `newAPIHadoopRDD` to load RDD from HBase | |
//直接从 HBase 中读取数据并转成 Spark 能直接操作的 RDD[K,V] | |
//设置查询的表名 | |
conf.set(TableInputFormat.INPUT_TABLE, "user") | |
//添加过滤条件,年龄大于 18 岁 | |
val scan = new Scan() | |
scan.setFilter(new SingleColumnValueFilter("basic".getBytes,"age".getBytes, | |
CompareOp.GREATER_OR_EQUAL,Bytes.toBytes(18))) | |
conf.set(TableInputFormat.SCAN,convertScanToString(scan)) | |
val usersRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], | |
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], | |
classOf[org.apache.hadoop.hbase.client.Result]) | |
val count = usersRDD.count() | |
println("Users RDD Count:" + count) | |
usersRDD.cache() | |
//遍历输出 | |
usersRDD.foreach{ case (_,result) => | |
val key = Bytes.toInt(result.getRow) | |
val name = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes)) | |
val age = Bytes.toInt(result.getValue("basic".getBytes,"age".getBytes)) | |
println("Row key:"+key+" Name:"+name+" Age:"+age) | |
} | |
// ================================= | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment