Last active
June 19, 2017 10:06
-
-
Save jamesrajendran/5fe07be0f54b5d294a18cf5b6b24ad7f to your computer and use it in GitHub Desktop.
kafka Producer - consumer code example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package kafkaHome; | |
import java.util.Properties; | |
//import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.*; | |
//import org.apache.kafka.clients.producer.ProducerRecord; | |
//import org.apache.kafka.clients.producer.RecordMetadata; | |
public class SimpleProducer { | |
public static void main(String[] args) throws Exception{ | |
/* if (args.length < 1) | |
{ | |
System.out.println("Please enter values for topic "); | |
return; | |
} | |
*/ | |
String topicName = "ftokafkaTopicNew";// "ftokafkaTopicNew"; | |
String key = null; //"Key1"; | |
String value = "value"; //args[0]; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "192.168.236.130:9092"); | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); | |
props.put("username", "root"); | |
props.put("password", "cloudera"); | |
Producer<String, String> producer = new KafkaProducer(props); | |
// keep both the loops below while testing streaming | |
//keep the first loop but comment the second one and run the window with length 60sec and interval 5 seconds and run the producer a few times | |
for(int i = 0; i < 100; i++){ | |
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), "Value1 ")); | |
} | |
/* | |
for(int j = 0; j < 10; j++){ | |
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(j), "Value2 ")); | |
} | |
*/ | |
// producer.flush(); | |
// RecordMetadata metaData = producer.send(record).get(); | |
producer.close(); | |
System.out.println("SimpleProducer Completed."); | |
} | |
} | |
===========================================spark Streaming consuming kafa data==================== | |
==scala script=== | |
import kafka.serializer.StringDecoder | |
import org.apache.spark.streaming._ | |
import org.apache.spark.streaming.kafka._ | |
import org.apache.spark.SparkConf | |
import org.apache.spark.streaming.dstream.InputDStream | |
import org.apache.spark.sql._ | |
//object FlumeToKafkaToSpark { | |
case class Word(word: String) | |
//def main(args: Array[String]) | |
//{ | |
println("hello testing"); | |
val sparkConf = new SparkConf().setAppName("ftokafkatospark").setMaster("local[2]"); | |
val topicSet = "ftokafkaTopicNew".split(",").toSet | |
val kafkaParams = Map[String,String]("bootstrap.servers" -> "localhost:9092") | |
val ssc = new StreamingContext(sparkConf, Seconds(5)) | |
// val messages: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder](ssc,kafkaParams,topicSet) | |
val messages = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder](ssc,kafkaParams,topicSet).window(Seconds(10),Seconds(5)) | |
messages.foreach { rdd => | |
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) | |
import sqlContext.implicits._ | |
val wordDStream = rdd.map(x => Word(x._2)).toDF() | |
wordDStream.registerTempTable("word_table") | |
val wordCountResult = sqlContext.sql("select word,count(*) from word_table group by word") | |
wordCountResult.show() | |
ssc.start() | |
ssc.awaitTermination() | |
//countByDept.saveAsTextFiles(args(0)) | |
===========================================KAFKA Simple CONSUMER====================================== | |
import java.util.Arrays; | |
import java.util.Properties; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
public class SimpleConsumer { | |
public static void main(String[] args) throws Exception{ | |
String topicName = "ftokafkaTopicNew"; | |
KafkaConsumer<String, String> consumer = null; | |
String groupName = "RG"; | |
Properties props = new Properties(); | |
props.put("bootstrap.servers", "192.168.236.130:9092"); | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); | |
props.put("username", "root"); | |
props.put("password", "cloudera"); | |
//props.put("enable.auto.commit", "false"); | |
props.put("group.id", groupName); | |
consumer = new KafkaConsumer(props); | |
//RebalanceListener rebalanceListener = new RebalanceListener(consumer); | |
consumer.subscribe(Arrays.asList(topicName));//,rebalanceListener); | |
try{ | |
while(true){ | |
ConsumerRecords<String, String> records = consumer.poll(100); | |
//if (records.count() == 0 ){break;} //for unit testing | |
for(ConsumerRecord record : records){ | |
System.out.println(record.topic() +"-"+record.partition()+"-"+record.offset()); | |
//rebalanceListener.addOffsets(record.topic(),record.partition(), record.offset()); | |
} | |
} | |
}catch(Exception e) | |
{ | |
System.out.println("Exception"); | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment