Skip to content

Instantly share code, notes, and snippets.

@apple-corps
Last active June 18, 2016 19:36
Show Gist options
  • Save apple-corps/b0efa4ff6ff4a7c3c8bb56767d0b6877 to your computer and use it in GitHub Desktop.
Save apple-corps/b0efa4ff6ff4a7c3c8bb56767d0b6877 to your computer and use it in GitHub Desktop.
KafkaToHbase.java
import org.apache.commons.net.util.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.Logging;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.*;
import org.apache.spark.streaming.Duration;
import org.apache.spark.Accumulator;
import java.security.*;
import java.util.*;
import com.cloudera.spark.hbase.JavaHBaseContext;
public class KafkaToHbase {
public static final String TABLE_NAME = "ADMd5";
public static final byte[] COL_FAM = Bytes.toBytes("a");
public static final byte[] u = Bytes.toBytes("u");
public static final byte[] A_Rank = Bytes.toBytes("ar");
public static final byte[] g = Bytes.toBytes("g");
public static final byte[] location = Bytes.toBytes("l");
public static final byte[] added_time = Bytes.toBytes("at");
public static void main(String args[]) {
if (args.length != 3) {
System.out.println("table name not provided");
System.exit(-1);
}
String tableName = args[0];
String topicName = args[1];
String brokerList = args[2];
JavaSparkContext jsc = new JavaSparkContext();
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Duration(1000));
Set<String> topicSet = Collections.singleton(topicName);
Map<String, String> kafkaParamsMap = new HashMap<String, String>();
kafkaParamsMap.put("metadata.broker.list", brokerList);
kafkaParamsMap.put("rebalance.backoff.ms", "3000");
kafkaParamsMap.put("auto.offset.reset", "smallest");
JavaPairInputDStream<byte[], byte[]> kafkaDirectDStream = KafkaUtils
Configuration hbConf = HBaseConfiguration.create();
hbConf.set("hbase.zookeeper.quorum", "zk1:2181,zk2:2181,zk3:2181");
hbConf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
hbConf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
JavaHBaseContext hbContext = new JavaHBaseContext(jsc, hbConf);
JavaPairDStream<byte[],HBaseADProtos.ADBytes> protobufPairStream = kafkaDirectDStream.mapValues(new ProtoFunction());
JavaDStream<HBaseADProtos.ADBytes> valueStream = protobufPairStream.transform(new GetValuesFunction());
hbContext.streamBulkPut(valueStream, tableName, new PutFunction(), true);
jssc.start();
jssc.awaitTermination();
}
public static class PutFunction implements Function<HBaseADProtos.ADBytes, Put> {
public Put call( HBaseADProtos.ADBytes adb ) throws Exception {
byte[] urlByteArr = adb.getUrl().toByteArray();
Put put = new Put(mkRowKey(urlByteArr));
put.add(COL_FAM, url, urlByteArr);
if (adb.hasA_Rank()) put.add(COL_FAM, A_Rank, adb.getA_Rank().toByteArray());
if (adb.hasG()) put.add(COL_FAM, g, adb.getG().toByteArray());
if (adb.hasLocation()) put.add(COL_FAM, location, adb.getLocation().toByteArray());
if (adb.hasAddedTime()) put.add(COL_FAM, added_time, adb.getAddedTime().toByteArray());
return put;
}
}
public static class ProtoFunction implements Function<byte[], HBaseADProtos.ADBytes> {
public HBaseADProtos.ADBytes call(byte[] b) throws Exception {
return HBaseADProtos.ADBytes.parseFrom(b);
}
}
public static class GetValuesFunction implements Function<JavaPairRDD<byte[],HBaseADProtos.ADBytes>, JavaRDD<HBaseADProtos.ADBytes>> {
public JavaRDD<HBaseADProtos.ADBytes> call( JavaPairRDD<byte[],HBaseADProtos.ADBytes> inputPairRDD) throws Exception {
return inputPairRDD.values();
}
}
private static byte[] mkRowKey(byte[] strUrl) {
try {
return Bytes.toBytes(Base64.encodeBase64String(MessageDigest.getInstance("MD5").digest(strUrl)).trim());
}
catch (java.security.NoSuchAlgorithmException e) {
System.out.println(String.format("mkRowKey error %s", e.getMessage()));
}
return null;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment