Last active
June 18, 2016 19:36
-
-
Save apple-corps/b0efa4ff6ff4a7c3c8bb56767d0b6877 to your computer and use it in GitHub Desktop.
KafkaToHbase.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.commons.net.util.Base64; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.spark.Logging; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.Function; | |
import org.apache.spark.storage.StorageLevel; | |
import org.apache.spark.streaming.api.java.*; | |
import org.apache.spark.streaming.kafka.*; | |
import org.apache.spark.streaming.Duration; | |
import org.apache.spark.Accumulator; | |
import java.security.*; | |
import java.util.*; | |
import com.cloudera.spark.hbase.JavaHBaseContext; | |
public class KafkaToHbase { | |
public static final String TABLE_NAME = "ADMd5"; | |
public static final byte[] COL_FAM = Bytes.toBytes("a"); | |
public static final byte[] u = Bytes.toBytes("u"); | |
public static final byte[] A_Rank = Bytes.toBytes("ar"); | |
public static final byte[] g = Bytes.toBytes("g"); | |
public static final byte[] location = Bytes.toBytes("l"); | |
public static final byte[] added_time = Bytes.toBytes("at"); | |
public static void main(String args[]) { | |
if (args.length != 3) { | |
System.out.println("table name not provided"); | |
System.exit(-1); | |
} | |
String tableName = args[0]; | |
String topicName = args[1]; | |
String brokerList = args[2]; | |
JavaSparkContext jsc = new JavaSparkContext(); | |
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Duration(1000)); | |
Set<String> topicSet = Collections.singleton(topicName); | |
Map<String, String> kafkaParamsMap = new HashMap<String, String>(); | |
kafkaParamsMap.put("metadata.broker.list", brokerList); | |
kafkaParamsMap.put("rebalance.backoff.ms", "3000"); | |
kafkaParamsMap.put("auto.offset.reset", "smallest"); | |
JavaPairInputDStream<byte[], byte[]> kafkaDirectDStream = KafkaUtils | |
Configuration hbConf = HBaseConfiguration.create(); | |
hbConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181"); | |
hbConf.addResource(new Path("/etc/hbase/conf/core-site.xml")); | |
hbConf.addResource(new Path("/etc/hbase/conf/hbase-site.xml")); | |
JavaHBaseContext hbContext = new JavaHBaseContext(jsc, hbConf); | |
JavaPairDStream<byte[],HBaseADProtos.ADBytes> protobufPairStream = kafkaDirectDStream.mapValues(new ProtoFunction()); | |
JavaDStream<HBaseADProtos.ADBytes> valueStream = protobufPairStream.transform(new GetValuesFunction()); | |
hbContext.streamBulkPut(valueStream, tableName, new PutFunction(), true); | |
jssc.start(); | |
jssc.awaitTermination(); | |
} | |
public static class PutFunction implements Function<HBaseADProtos.ADBytes, Put> { | |
public Put call( HBaseADProtos.ADBytes adb ) throws Exception { | |
byte[] urlByteArr = adb.getUrl().toByteArray(); | |
Put put = new Put(mkRowKey(urlByteArr)); | |
put.add(COL_FAM, url, urlByteArr); | |
if (adb.hasA_Rank()) put.add(COL_FAM, A_Rank, adb.getA_Rank().toByteArray()); | |
if (adb.hasG()) put.add(COL_FAM, g, adb.getG().toByteArray()); | |
if (adb.hasLocation()) put.add(COL_FAM, location, adb.getLocation().toByteArray()); | |
if (adb.hasAddedTime()) put.add(COL_FAM, added_time, adb.getAddedTime().toByteArray()); | |
return put; | |
} | |
} | |
public static class ProtoFunction implements Function<byte[], HBaseADProtos.ADBytes> { | |
public HBaseADProtos.ADBytes call(byte[] b) throws Exception { | |
return HBaseADProtos.ADBytes.parseFrom(b); | |
} | |
} | |
public static class GetValuesFunction implements Function<JavaPairRDD<byte[],HBaseADProtos.ADBytes>, JavaRDD<HBaseADProtos.ADBytes>> { | |
public JavaRDD<HBaseADProtos.ADBytes> call( JavaPairRDD<byte[],HBaseADProtos.ADBytes> inputPairRDD) throws Exception { | |
return inputPairRDD.values(); | |
} | |
} | |
private static byte[] mkRowKey(byte[] strUrl) { | |
try { | |
return Bytes.toBytes(Base64.encodeBase64String(MessageDigest.getInstance("MD5").digest(strUrl)).trim()); | |
} | |
catch (java.security.NoSuchAlgorithmException e) { | |
System.out.println(String.format("mkRowKey error %s", e.getMessage())); | |
} | |
return null; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment