-
-
Save vaibhav93/59f29fb289559c20b97cffd240e3333a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Kafka = require('node-rdkafka'); | |
var producer = require('./producer'); | |
var consumer = new Kafka.KafkaConsumer({ | |
//'debug': 'all', | |
'metadata.broker.list': 'gc-mskafka1002.pp-devqa-ms-thirdparty.us-central1.gcp.dev.paypalinc.com:9092', | |
'group.id': 'node-rdkafka-consumer-flow-example', | |
'enable.auto.commit': false, | |
'socket.keepalive.enable': true, | |
'reconnect.backoff.jitter.ms': 500 | |
}); | |
var topicName = 'test_partitions'; | |
//logging debug messages, if debug is enabled | |
consumer.on('event.log', function (log) { | |
console.log(log); | |
}); | |
//logging all errors | |
consumer.on('event.error', function (err) { | |
console.error('Error from consumer'); | |
console.error(err); | |
}); | |
//counter to commit offsets every numMessages are received | |
var counter = 0; | |
var numMessages = 1; | |
consumer.on('ready', function (arg) { | |
consumer.isReady = true; | |
consumer.subscribe([topicName]); | |
console.log('consumer ready.' + JSON.stringify(arg)); | |
//start consuming messages | |
consumer.consume(1); | |
}); | |
if(consumer.isReady){ | |
consumer.commit(); | |
consumer.consume(1); | |
} | |
consumer.on('data', function (m) { | |
counter++; | |
if (counter % numMessages === 0) { | |
console.log('calling commit'); | |
consumer.commit(m); | |
} | |
// var topicName1 = 'test_partitions'; | |
// var topicName2 = 'test_topic2'; | |
// var partition = -1; | |
//var buff = Buffer.from(m.value.toString()); | |
// Output the actual message contents | |
//console.log(JSON.stringify(m)); | |
console.log(m.value.toString()); | |
console.log('********'+m.value%2+'********'); | |
// switch (m.value % 2) { | |
// case 0: | |
// producer.producer.produce(topicName1, partition, buff, counter.toString()); | |
// break; | |
// case 1: | |
// producer.producer.produce(topicName2, partition, buff, counter.toString()); | |
// } | |
}); | |
producer.producer.on('delivery-report', function (err, report) { | |
// Report of delivery statistics here: | |
console.log('delivery-report: ' + JSON.stringify(report)); | |
}); | |
consumer.on('disconnected', function (arg) { | |
console.log('consumer disconnected. ' + JSON.stringify(arg)); | |
}); | |
//starting the consumer | |
consumer.connect(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment