Skip to content

Instantly share code, notes, and snippets.

@zachelrath
Created May 19, 2022 13:52
Show Gist options
  • Save zachelrath/7483e37175bf4d30e4a1c8a4727a234d to your computer and use it in GitHub Desktop.
Save zachelrath/7483e37175bf4d30e4a1c8a4727a234d to your computer and use it in GitHub Desktop.
Simple Pulsar queue consumer
@Configuration
@ConditionalOnProperty(name = "consumer.enabled", havingValue = "true")
@Slf4j
public class PulsarConsumerConfiguration {
private final PulsarClient client;
private final List<Consumer> consumers;
private final SimpleMessageListener msgListener;
public PulsarConsumerConfiguration(
@Value("${pulsar.url}") String pulsarUrl,
@Value("${pulsar.topic.name}") String topicName,
@Value("${pulsar.subscription.name}") String subscriptionName,
SimpleMessageListener messageListener
) throws PulsarClientException {
this.msgListener = messageListener;
client = PulsarClient.builder()
.serviceUrl(pulsarUrl)
.build();
log.info("(CONSUMER) Connected to Pulsar client");
consumers = new ArrayList<>();
Consumer<byte[]> consumer = client.newConsumer()
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Key_Shared)
.topic(topicName)
.receiverQueueSize(5)
.messageListener(messageListener)
.ackTimeout(10, TimeUnit.SECONDS)
.subscribe();
consumers.add(consumer);
log.info("(CONSUMER) Subscribed to topic {}", topicName);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment