Skip to content

Instantly share code, notes, and snippets.

@catchin
Last active June 14, 2019 08:02
Show Gist options
  • Save catchin/5f22926497392560dd90e41293dcc690 to your computer and use it in GitHub Desktop.
Save catchin/5f22926497392560dd90e41293dcc690 to your computer and use it in GitHub Desktop.
reactor-kafka issue with doOnConsumer
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = ["some-topic"], controlledShutdown = true)
class KafkaReceiverTest {
@Autowired
protected lateinit var kafkaProperties: KafkaProperties
@Autowired
protected lateinit var embeddedKafka: EmbeddedKafkaBroker
@Test
internal fun test_receiver() {
val props = HashMap<String, Any>()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.brokersAsString)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer::class.java)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
val receiverOptions = ReceiverOptions.create<String, String>(props)
.subscription(singletonList("some-topic"))
KafkaReceiver.create(receiverOptions)
.doOnConsumer { println("I was here") }
.block()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment