Created
August 5, 2016 07:46
-
-
Save Saberko/cb784b77f931c1cbd019c8600db4a07d to your computer and use it in GitHub Desktop.
spark hbase integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package utils; | |
import org.apache.commons.cli.Options; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.Cell; | |
import org.apache.hadoop.hbase.CellScanner; | |
import org.apache.hadoop.hbase.CellUtil; | |
import org.apache.hadoop.hbase.HBaseConfiguration; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.client.Result; | |
import org.apache.hadoop.hbase.client.Scan; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.hbase.mapred.TableOutputFormat; | |
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; | |
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; | |
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; | |
import org.apache.hadoop.hbase.util.Base64; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.hadoop.mapred.JobConf; | |
import org.apache.spark.Accumulator; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.Function; | |
import org.apache.spark.api.java.function.PairFunction; | |
import org.apache.spark.api.java.function.VoidFunction; | |
import scala.Tuple2; | |
import java.util.*; | |
/** | |
* run : | |
* spark-submit | |
* --class utils.SampleApp | |
* --master spark://h3:7077 | |
* target/dmp-SNAPSHOT-1.0.jar | |
* --run_mode local | |
* --redis_host w1 | |
* --duration 300 | |
* --checkpoint_directory /tmp | |
* | |
*/ | |
public class SampleApp { | |
public static final String APP_DURATION = "durations"; | |
public static final String REDIS_HOST = "redis_hosts"; | |
public static final String LOGS_DIRECTORY = "logs_directory"; | |
public static final String RUN_MODE = "run_mode"; | |
public static final String CHECKPOINT_DIRECTORY = "checkpoint_directory"; | |
public static final Options THE_OPTIONS = createOptions(); | |
private static Options createOptions() { | |
Options options = new Options(); | |
options.addOption(APP_DURATION, true, "The streaming app's duration"); | |
options.addOption(REDIS_HOST, true, "The redis host"); | |
options.addOption(LOGS_DIRECTORY, true, "The directory where logs are written"); | |
options.addOption(RUN_MODE, true, "The applicaton's running mode"); | |
options.addOption(CHECKPOINT_DIRECTORY, true, "The checkpoint directory"); | |
return options; | |
} | |
public static void main (String[] args) { | |
if (args.length == 0) { | |
System.err.println("Some parameters are needed"); | |
System.exit(-1); | |
} | |
AppConfigure.setFromCommandLineArgs(THE_OPTIONS, args); | |
SparkConf conf = new SparkConf().setAppName("Sample App"); | |
if ("local".equalsIgnoreCase(AppConfigure.getInstance().getRunMode())) { | |
conf.setMaster("local[*]"); | |
} | |
JavaSparkContext jsc = new JavaSparkContext(conf); | |
Configuration hconf = HBaseConfiguration.create(); | |
// set Scan to scan specific columns colFam:col | |
// Scan scan = new Scan(); | |
// scan.addFamily(Bytes.toBytes("name")); | |
// scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("n1")); | |
try { | |
// String table = "mytable"; | |
// hconf.set(TableInputFormat.INPUT_TABLE, table); | |
// ClientProtos.Scan proto = ProtobufUtil.toScan(scan); | |
// String scanToString = Base64.encodeBytes(proto.toByteArray()); | |
// conf.set(TableInputFormat.SCAN, scanToString); | |
hconf.set(TableInputFormat.INPUT_TABLE, "mytable"); | |
JavaPairRDD<ImmutableBytesWritable, Result> rdd = jsc.newAPIHadoopRDD(hconf, TableInputFormat.class, | |
ImmutableBytesWritable.class, Result.class); | |
System.out.println("totla records: " + rdd.count()); | |
// JavaRDD<Result> result = rdd.map(new Function<Tuple2<ImmutableBytesWritable, Result>, Result>() { | |
// public Result call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception { | |
// return tuple._2(); | |
// } | |
// }); | |
JavaPairRDD<String, String> myrdd = rdd.mapToPair(new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, String>() { | |
public Tuple2<String, String> call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception { | |
CellScanner cellScanner = tuple._2().cellScanner(); | |
while(cellScanner.advance()) { | |
Cell cell = cellScanner.current(); | |
// String key = "MYKEY--" + Bytes.toString(CellUtil.cloneRow(cell)); | |
// tuple._1()的ImmutableBytesWritable其实就是rowkey | |
String key = "MYKEY--" + Bytes.toString(tuple._1().get()); | |
String value = Bytes.toString(CellUtil.cloneValue(cell)); | |
return new Tuple2<String, String>(key, value); | |
} | |
return null; | |
} | |
}); | |
JavaPairRDD<ImmutableBytesWritable, Result> score = rdd.filter(new Function<Tuple2<ImmutableBytesWritable, Result>, Boolean>() { | |
public Boolean call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception { | |
CellScanner cellScanner = tuple._2().cellScanner(); | |
while (cellScanner.advance()) { | |
Cell cell = cellScanner.current(); | |
String colFam = Bytes.toString(CellUtil.cloneFamily(cell)); | |
if (colFam.equalsIgnoreCase("score")) | |
return true; | |
} | |
return false; | |
} | |
}); | |
System.out.println("score records: " + score.count()); | |
score.foreach(new VoidFunction<Tuple2<ImmutableBytesWritable, Result>>() { | |
public void call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception { | |
String key = Bytes.toString(tuple._2().getRow()); | |
String value = Bytes.toString(tuple._2().getValue("score".getBytes(), "n1".getBytes())); | |
System.out.println(key + ": \n" + value); | |
} | |
}); | |
// rdd.foreach(new VoidFunction<Tuple2<ImmutableBytesWritable, Result>>() { | |
// public void call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception { | |
// String key = Bytes.toString(tuple._2().getRow()); | |
// String value = Bytes.toString(tuple._2().getValue("name".getBytes(), "n1".getBytes())); | |
// System.out.println(key + ": \n" + value); | |
// } | |
// }); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
// write to HBase | |
Configuration writeConf = HBaseConfiguration.create(); | |
writeConf.set("hbase.zookeeper.property.clientPort", "2181"); | |
writeConf.set("hbase.zookeeper.quorum", "localhost"); | |
// exampleClass - a class whose containing jar is used as the job's jar. | |
JobConf jobConf = new JobConf(writeConf, SampleApp.class); | |
jobConf.setOutputFormat(TableOutputFormat.class); | |
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "mytable"); | |
List<String> rawData = Arrays.asList("one", "another"); | |
JavaPairRDD<ImmutableBytesWritable, Put> data = jsc.parallelize(rawData).mapToPair(new PairFunction<String, ImmutableBytesWritable, Put>() { | |
public Tuple2<ImmutableBytesWritable, Put> call(String s) throws Exception { | |
Random random = new Random(); | |
Put p = new Put(Bytes.toBytes("abc" + random.nextInt())); | |
p.addColumn("name".getBytes(), "n1".getBytes(), s.getBytes()); | |
return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(), p); | |
} | |
}); | |
data.saveAsHadoopDataset(jobConf); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment