Created
July 9, 2018 19:58
-
-
Save soeirosantos/d972a97b98a4a5a24f39e7861dc016d3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
It's a dummy example about how to synchronize the KafkaConsumer accross threads in scala. | |
"The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the | |
application making the call. It is the responsibility of the user to ensure that multi-threaded | |
access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException." | |
- https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#multithreaded | |
*/ | |
import java.util.concurrent.locks.ReentrantLock | |
import java.util.{Collections, Optional, Properties} | |
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} | |
import org.apache.kafka.common.TopicPartition | |
import org.apache.kafka.common.serialization.ByteArrayDeserializer | |
object TestConsumerInThread { | |
def main(args: Array[String]): Unit = { | |
new Principal().start() | |
} | |
} | |
class Principal { | |
private val lock = new ReentrantLock() | |
var conn: KafkaConsumer[Array[Byte], Array[Byte]] = _ | |
final def start(): Unit = lock.synchronized { | |
val props = new Properties() | |
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094") | |
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-local") | |
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") | |
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") | |
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") | |
conn = new KafkaConsumer(props) | |
new MyThread(conn, lock).start() | |
Thread.sleep(5000) | |
println("adding partition") | |
lock.synchronized { | |
conn.assign(Collections.singletonList(new TopicPartition("theTopic", 0))) | |
println("partition added") | |
} | |
} | |
} | |
class MyThread(consumer: KafkaConsumer[Array[Byte], Array[Byte]], lock: ReentrantLock) extends Thread { | |
override def run(): Unit = { | |
println("starting") | |
while (true) { | |
lock.synchronized { | |
if (!consumer.assignment().isEmpty) { | |
consumer.poll(100) | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment