Skip to content

Instantly share code, notes, and snippets.

@tankala
Last active April 24, 2018 15:08
Show Gist options
  • Save tankala/dbaa64cd73f7c803e487f59433e08175 to your computer and use it in GitHub Desktop.
Save tankala/dbaa64cd73f7c803e487f59433e08175 to your computer and use it in GitHub Desktop.
Consumer Group Initiation in Node.js
var kafka = require('kafka-node');
exports.initiateKafkaConsumerGroup = function (groupName, topicName) {
var options = {
// connect directly to kafka broker (instantiates a KafkaClient)
kafkaHost: '127.0.0.1:9092',
groupId: groupName,
autoCommit: true,
autoCommitIntervalMs: 5000,
sessionTimeout: 15000,
fetchMaxBytes: 10 * 1024 * 1024, // 10 MB
// An array of partition assignment protocols ordered by preference. 'roundrobin' or 'range' string for
// built ins (see below to pass in custom assignment protocol)
protocol: ['roundrobin'],
// Offsets to use for new groups other options could be 'earliest' or 'none'
// (none will emit an error if no offsets were saved) equivalent to Java client's auto.offset.reset
fromOffset: 'latest',
// how to recover from OutOfRangeOffset error (where save offset is past server retention)
// accepts same value as fromOffset
outOfRangeOffset: 'earliest'
};
var consumerGroup = new kafka.ConsumerGroup(options, topicName);
consumerGroup.on('message', function (message) {
console.log('Message: ' + message);
//TODO: You can write your code or call messageProcesser function
});
consumerGroup.on('error', function onError(error) {
console.error(error);
});
console.log('Started Consumer for topic "' + topicName + '" in group "' + groupName + '"');
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment