Skip to content

Instantly share code, notes, and snippets.

@shibd
Last active July 24, 2024 13:36
Show Gist options
  • Save shibd/82b77eccc8145ac2c1784aca97a29c24 to your computer and use it in GitHub Desktop.
Save shibd/82b77eccc8145ac2c1784aca97a29c24 to your computer and use it in GitHub Desktop.
If a subscription has many consumers and each consumer individually acknowledges messages from other consumers, this scenario is very CPU resource-intensive.
public static void main(String[] args) throws PulsarClientException {
final String subName = "test";
final String topicName = "persistent://public/default/testAckMessageWithMultipleConsumers-" + UUID.randomUUID();
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topicName)
.enableBatching(false)
.create();
int numConsumers = 200;
List<Consumer<byte[]>> consumers = new ArrayList<>(numConsumers);
List<List<MessageId>> messageIds = new ArrayList<>(numConsumers);
for (int i = 0; i < numConsumers; i++) {
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(topicName)
.consumerName("consumer-" + i)
.negativeAckRedeliveryDelay(10000, TimeUnit.SECONDS)
.ackTimeout(10000, TimeUnit.SECONDS)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
consumers.add(consumer);
messageIds.add(new ArrayList<>());
}
// Send 10,000 messages
for (int i = 0; i < 20000; i++) {
producer.newMessage().value(("Hello Pulsar - " + i).getBytes()).sendAsync();
log.info("send msg: " + i);
}
producer.flush();
// Each consumer receives messages
int totalMessagesReceived = 0;
for (int i = 0; i < numConsumers; i++) {
for (int j = 0; j < 100; j++) {
Consumer<byte[]> consumer = consumers.get(i);
Message<byte[]> message = consumer.receive(10, TimeUnit.MILLISECONDS);
if (message != null) {
log.info(consumer.getConsumerName() + "-receive msg: " + new String(message.getValue()));
messageIds.get(i).add(message.getMessageId());
totalMessagesReceived++;
}
if (totalMessagesReceived >= 20000) {
break;
}
}
}
log.info("start ack message: " + totalMessagesReceived);
// Each consumer acks messages received by others
for (int i = 0; i < numConsumers; i++) {
Consumer<byte[]> consumer = consumers.get(i);
for (int j = 0; j < numConsumers; j++) {
if (i != j) {
for (MessageId messageId : messageIds.get(j)) {
consumer.acknowledge(messageId);
}
}
}
}
for (Consumer<byte[]> consumer : consumers) {
consumer.close();
}
producer.close();
pulsarClient.close();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment