-
-
Save Sathiyarajan/65ecb6881606dad02f0c448dd830cd0e to your computer and use it in GitHub Desktop.
spark读取、写入hbase操作
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package learn; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.client.Scan; | |
import org.apache.hadoop.hbase.filter.CompareFilter; | |
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.hbase.mapred.TableOutputFormat; | |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; | |
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; | |
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; | |
import org.apache.hadoop.hbase.util.Base64; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.hadoop.util.ProtoUtil; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.PairFunction; | |
import org.apache.spark.api.java.function.VoidFunction; | |
import scala.Tuple2; | |
import scala.Tuple3; | |
import java.io.IOException; | |
import java.util.Arrays; | |
import java.util.List; | |
/** | |
* write data to HBase with Spark | |
*/ | |
public class SparkHbaseIntegration { | |
/** | |
* 将三元组rdd转换成Tuple2<ImmutableBytesWritable, Put>的函数 | |
*/ | |
public static final PairFunction<Tuple3<Integer, String, Integer>, ImmutableBytesWritable, Put> CONVERT_FUN = | |
new PairFunction<Tuple3<Integer, String, Integer>, ImmutableBytesWritable, Put>() { | |
public Tuple2<ImmutableBytesWritable, Put> call(Tuple3<Integer, String, Integer> data) throws Exception { | |
Put put = new Put(Bytes.toBytes(data._1().toString())); // rowKey | |
put.addColumn("basic".getBytes(), "name".getBytes(), data._2().getBytes()); | |
put.addColumn("basic".getBytes(), "age".getBytes(), Bytes.toBytes(data._3())); | |
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), put); | |
} | |
}; | |
public static final String scanToString (Scan scan) { | |
try { | |
ClientProtos.Scan s = ProtobufUtil.toScan(scan); | |
return Base64.encodeBytes(s.toByteArray()); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
return null; | |
} | |
public static void main (String[] args) { | |
SparkConf sparkConf = new SparkConf().setAppName("Hbase writter").setMaster("local[*]"); | |
JavaSparkContext sc = new JavaSparkContext(sparkConf); | |
Configuration conf = HBaseConfiguration.create(); | |
conf.set("hbase.zookeeper.property.clienPort", "2181"); | |
conf.set("hbase.zookeeper.quorum", "localhost"); | |
// Write to hbase | |
JobConf jobConf = new JobConf(conf, SparkHbaseIntegration.class); | |
jobConf.setOutputFormat(TableOutputFormat.class); | |
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "user"); | |
List<Tuple3<Integer, String, Integer>> data = Arrays.asList(new Tuple3<Integer, String, Integer>(1, "linzhili", 2), | |
new Tuple3<Integer, String, Integer>(2, "zhangsichao", 33)); | |
JavaPairRDD<ImmutableBytesWritable, Put> rddData = sc.parallelize(data).mapToPair(SparkHbaseIntegration.CONVERT_FUN); | |
rddData.saveAsHadoopDataset(jobConf); | |
// ========== Read from hbase ========== | |
conf.set(TableInputFormat.INPUT_TABLE, "user"); // 表名 | |
// 可以使用scan来设置过滤条件,或指定扫描某一列族的某一列 | |
/************************************************* | |
Scan scan = new Scan(); | |
scan.setFilter(new SingleColumnValueFilter("basic".getBytes(),"age".getBytes(), | |
CompareFilter.CompareOp.GREATER_OR_EQUAL,Bytes.toBytes(18))); //过滤大于18岁的 | |
scan.addFamily(Bytes.toBytes("basic")); | |
scan.addColumn(Bytes.toBytes("basic"), Bytes.toBytes("age")); // 只扫描 basic:age | |
conf.set(TableInputFormat.SCAN, SparkHbaseIntegration.scanToString(scan)); | |
***************************************************/ | |
JavaPairRDD<ImmutableBytesWritable, Result> rdd = sc.newAPIHadoopRDD(conf, TableInputFormat.class, | |
ImmutableBytesWritable.class, Result.class); | |
System.out.println("totla records: " + rdd.count()); | |
rdd.foreach(new VoidFunction<Tuple2<ImmutableBytesWritable, Result>>() { | |
public void call(Tuple2<ImmutableBytesWritable, Result> tuple2) throws Exception { | |
Result result = tuple2._2(); | |
String key = Bytes.toString(result.getRow()); | |
String name = Bytes.toString(result.getValue("basic".getBytes(), "name".getBytes())); | |
int age = Bytes.toInt(result.getValue("basic".getBytes(), "age".getBytes())); | |
System.out.println("Key: " + key + "\tName: " + name + "\tAge: " + age); | |
} | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment