-
-
Save fancellu/f78e11b1808db2727d76 to your computer and use it in GitHub Desktop.
import java.util | |
import org.apache.kafka.clients.consumer.KafkaConsumer | |
import scala.collection.JavaConverters._ | |
object ConsumerExample extends App { | |
import java.util.Properties | |
val TOPIC="test" | |
val props = new Properties() | |
props.put("bootstrap.servers", "localhost:9092") | |
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") | |
props.put("group.id", "something") | |
val consumer = new KafkaConsumer[String, String](props) | |
consumer.subscribe(util.Collections.singletonList(TOPIC)) | |
while(true){ | |
val records=consumer.poll(100) | |
for (record<-records.asScala){ | |
println(record) | |
} | |
} | |
} |
object ProducerExample extends App { | |
import java.util.Properties | |
import org.apache.kafka.clients.producer._ | |
val props = new Properties() | |
props.put("bootstrap.servers", "localhost:9092") | |
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") | |
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") | |
val producer = new KafkaProducer[String, String](props) | |
val TOPIC="test" | |
for(i<- 1 to 50){ | |
val record = new ProducerRecord(TOPIC, "key", s"hello $i") | |
producer.send(record) | |
} | |
val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date) | |
producer.send(record) | |
producer.close() | |
} |
Above example works fine simply with below dependencies:-
"org.apache.kafka" % "kafka_2.11" % "1.1.1"
scala 2.11
spark 2.3.0
Ran it through spark shell with below commands
spark2-shell --packages org.apache.kafka:kafka_2.11:1.1.1
"org.apache.kafka" % "kafka_2.11" % "1.1.1" this worked fine for me.
name := "myApp"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.0"
libraryDependencies ++= Seq(
"org.apache.bahir" %% "spark-streaming-twitter" % sparkVersion,
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" % "spark-sql_2.11" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
"org.apache.kafka" % "kafka_2.11" % "1.1.1"
)
I'm using eclipse IDE getting below error. Any idea what is the issue.
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Hi, I am new to Kafka (using for only 3 days now) but I think
[error] Modules were resolved with conflicting cross-version suffixes in {file:/home/edureka/Desktop/code_practicals/}code_practicals:
[error] org.apache.kafka:kafka _2.11, _2.10
this means that you should use scala version 2.11 instead of 2.10
correct me if I am wrong...
A full kafkasteams example here
https://github.com/fancellu/kafkastreams
remove all and add following in build.sbt file
resolvers += Resolver.bintrayRepo("cakesolutions", "maven")
libraryDependencies += "net.cakesolutions" %% "scala-kafka-client" % "1.1.1"