Skip to content

Instantly share code, notes, and snippets.

@lxneng
Forked from MallikarjunaG/StreamCatsToHBase.py
Created August 20, 2020 14:21

Revisions

  1. @MallikarjunaG MallikarjunaG created this gist Apr 28, 2017.
    36 changes: 36 additions & 0 deletions StreamCatsToHBase.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,36 @@
    1: import sys
    2: import json
    3: from pyspark import SparkContext
    4: from pyspark.streaming import StreamingContext
    5:
    6:
    7: def SaveRecord(rdd):
    8: host = 'sparkmaster.example.com'
    9: table = 'cats'
    10: keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    11: valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
    12: conf = {"hbase.zookeeper.quorum": host,
    13: "hbase.mapred.outputtable": table,
    14: "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
    15: "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
    16: "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
    17: datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))
    18: datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
    19:
    20: if __name__ == "__main__":
    21: if len(sys.argv) != 3:
    22: print("Usage: StreamCatsToHBase.py <hostname> <port>")
    23: exit(-1)
    24:
    25: sc = SparkContext(appName="StreamCatsToHBase")
    26: ssc = StreamingContext(sc, 1)
    27: lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    28: lines.foreachRDD(SaveRecord)
    29:
    30: ssc.start() # Start the computation
    31: ssc.awaitTermination() # Wait for the computation to terminate