Last active
July 24, 2024 13:36
-
-
Save shibd/82b77eccc8145ac2c1784aca97a29c24 to your computer and use it in GitHub Desktop.
If a subscription has many consumers and each consumer individually acknowledges messages from other consumers, this scenario is very CPU resource-intensive.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws PulsarClientException { | |
final String subName = "test"; | |
final String topicName = "persistent://public/default/testAckMessageWithMultipleConsumers-" + UUID.randomUUID(); | |
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); | |
@Cleanup | |
Producer<byte[]> producer = pulsarClient | |
.newProducer() | |
.topic(topicName) | |
.enableBatching(false) | |
.create(); | |
int numConsumers = 200; | |
List<Consumer<byte[]>> consumers = new ArrayList<>(numConsumers); | |
List<List<MessageId>> messageIds = new ArrayList<>(numConsumers); | |
for (int i = 0; i < numConsumers; i++) { | |
Consumer<byte[]> consumer = pulsarClient | |
.newConsumer() | |
.topic(topicName) | |
.consumerName("consumer-" + i) | |
.negativeAckRedeliveryDelay(10000, TimeUnit.SECONDS) | |
.ackTimeout(10000, TimeUnit.SECONDS) | |
.subscriptionName(subName) | |
.subscriptionType(SubscriptionType.Shared) | |
.subscribe(); | |
consumers.add(consumer); | |
messageIds.add(new ArrayList<>()); | |
} | |
// Send 10,000 messages | |
for (int i = 0; i < 20000; i++) { | |
producer.newMessage().value(("Hello Pulsar - " + i).getBytes()).sendAsync(); | |
log.info("send msg: " + i); | |
} | |
producer.flush(); | |
// Each consumer receives messages | |
int totalMessagesReceived = 0; | |
for (int i = 0; i < numConsumers; i++) { | |
for (int j = 0; j < 100; j++) { | |
Consumer<byte[]> consumer = consumers.get(i); | |
Message<byte[]> message = consumer.receive(10, TimeUnit.MILLISECONDS); | |
if (message != null) { | |
log.info(consumer.getConsumerName() + "-receive msg: " + new String(message.getValue())); | |
messageIds.get(i).add(message.getMessageId()); | |
totalMessagesReceived++; | |
} | |
if (totalMessagesReceived >= 20000) { | |
break; | |
} | |
} | |
} | |
log.info("start ack message: " + totalMessagesReceived); | |
// Each consumer acks messages received by others | |
for (int i = 0; i < numConsumers; i++) { | |
Consumer<byte[]> consumer = consumers.get(i); | |
for (int j = 0; j < numConsumers; j++) { | |
if (i != j) { | |
for (MessageId messageId : messageIds.get(j)) { | |
consumer.acknowledge(messageId); | |
} | |
} | |
} | |
} | |
for (Consumer<byte[]> consumer : consumers) { | |
consumer.close(); | |
} | |
producer.close(); | |
pulsarClient.close(); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment