Skip to content

Instantly share code, notes, and snippets.

@tspannhw
Created May 11, 2022 14:03
Show Gist options
  • Select an option

  • Save tspannhw/49be2bd60612749c939b912907a48a9f to your computer and use it in GitHub Desktop.

Select an option

Save tspannhw/49be2bd60612749c939b912907a48a9f to your computer and use it in GitHub Desktop.
Multiple Partition Test
bin/pulsar-client produce --key "test1" "persistent://public/default/ack-2" -m "Test this thing 4" -n 25
public void consumeFromPulsarAsync() throws Exception {
PulsarClient client = PulsarClient.
builder()
.serviceUrl("pulsar://pulsar1:6650")
.build();
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("persistent://public/default/ack-2")
.consumerName("pulsar-consumer-id-" + Math.random())
.subscriptionName("pulsar-subscription-id-" + Math.random())
.subscriptionType(SubscriptionType.Shared).subscribe();
consumer.receiveAsync().thenCompose((msg) -> {
log.info("consumeFromPulsarAsync() consumed msg :: {}", msg.getValue());
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return null;
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment