Skip to content

Instantly share code, notes, and snippets.

@fancellu
Last active June 28, 2023 15:35
Show Gist options
  • Save fancellu/f78e11b1808db2727d76 to your computer and use it in GitHub Desktop.
Save fancellu/f78e11b1808db2727d76 to your computer and use it in GitHub Desktop.
Kafka Producer/Consumer Example in Scala
import java.util
import org.apache.kafka.clients.consumer.KafkaConsumer
import scala.collection.JavaConverters._
object ConsumerExample extends App {
import java.util.Properties
val TOPIC="test"
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("group.id", "something")
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(TOPIC))
while(true){
val records=consumer.poll(100)
for (record<-records.asScala){
println(record)
}
}
}
object ProducerExample extends App {
import java.util.Properties
import org.apache.kafka.clients.producer._
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val TOPIC="test"
for(i<- 1 to 50){
val record = new ProducerRecord(TOPIC, "key", s"hello $i")
producer.send(record)
}
val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
producer.send(record)
producer.close()
}
@sasikumarkbt
Copy link

I'm using eclipse IDE getting below error. Any idea what is the issue.
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.

@HasnainMG
Copy link

Hi, I am new to Kafka (using for only 3 days now) but I think
[error] Modules were resolved with conflicting cross-version suffixes in {file:/home/edureka/Desktop/code_practicals/}code_practicals:
[error] org.apache.kafka:kafka _2.11, _2.10

this means that you should use scala version 2.11 instead of 2.10
correct me if I am wrong...

@fancellu
Copy link
Author

A full kafkasteams example here
https://github.com/fancellu/kafkastreams

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment