Skip to content

Instantly share code, notes, and snippets.

@soeirosantos
Created July 9, 2018 19:58
Show Gist options
  • Save soeirosantos/d972a97b98a4a5a24f39e7861dc016d3 to your computer and use it in GitHub Desktop.
Save soeirosantos/d972a97b98a4a5a24f39e7861dc016d3 to your computer and use it in GitHub Desktop.
/**
It's a dummy example about how to synchronize the KafkaConsumer accross threads in scala.
"The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the
application making the call. It is the responsibility of the user to ensure that multi-threaded
access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException."
- https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded
*/
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, Optional, Properties}
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.ByteArrayDeserializer
object TestConsumerInThread {
def main(args: Array[String]): Unit = {
new Principal().start()
}
}
class Principal {
private val lock = new ReentrantLock()
var conn: KafkaConsumer[Array[Byte], Array[Byte]] = _
final def start(): Unit = lock.synchronized {
val props = new Properties()
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094")
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-local")
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
conn = new KafkaConsumer(props)
new MyThread(conn, lock).start()
Thread.sleep(5000)
println("adding partition")
lock.synchronized {
conn.assign(Collections.singletonList(new TopicPartition("theTopic", 0)))
println("partition added")
}
}
}
class MyThread(consumer: KafkaConsumer[Array[Byte], Array[Byte]], lock: ReentrantLock) extends Thread {
override def run(): Unit = {
println("starting")
while (true) {
lock.synchronized {
if (!consumer.assignment().isEmpty) {
consumer.poll(100)
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment