Created
October 8, 2014 17:20
-
-
Save johndstein/78246f10c6d9cfd3ea63 to your computer and use it in GitHub Desktop.
kafka.js
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env node | |
var kafka = require('kafka-node'); | |
var winston = require('winston'); | |
var _ = require('lodash'); | |
// Wraps a kafka consumer | |
// https://github.com/SOHU-Co/kafka-node#consumer | |
// We intentionally only allow a single topic, group, partition per consumer. | |
// | |
// For now, we always autoCommit and use the offset from the commit. | |
// | |
// We expect options as shown below which is a flattened version of what's | |
// documented here | |
// https://github.com/SOHU-Co/kafka-node#consumer | |
// and here | |
// https://github.com/alexguan/node-zookeeper-client#client-createclientconnectionstring-options | |
// | |
// Default values are listed below. With the following exceptions. | |
// | |
// 1. clientId will always be HIP_INGEST no matter what you set it to. | |
// 2. autoCommit will always be true . . . | |
// 3. fromOffset will always be false . . . | |
// | |
// We also added a logSilly option that will console.log every message received. | |
function Consumer(options) { | |
this.options = _.merge({ | |
"logSilly": false, | |
"client": { | |
"connectionString": "localhost:2181", | |
"clientId": "HIP_INGEST", | |
"sessionTimeout": 30000, | |
"spinDelay": 1000, | |
"retries": 0 | |
}, | |
"consumer": { | |
"topic": "test", | |
"groupId": "test", | |
"partition": 0, | |
"offset": 0, | |
"autoCommit": true, | |
"autoCommitIntervalMs": 5000, | |
"fetchMaxWaitMs": 100, | |
"fetchMinBytes": 1, | |
"fetchMaxBytes": 10240, | |
"fromOffset": false | |
} | |
}, options); | |
this.options.client.clientId = 'HIP_INGEST'; | |
this.options.consumer.autoCommit = true; | |
this.options.consumer.fromOffset = false; | |
this.key = Consumer.key(this.options.consumer.topic, | |
this.options.consumer.groupId, | |
this.options.consumer.partition); | |
winston.info('Instantiating ' + this.toString(true)); | |
// OK, so I learned the VERY hard way today each consumer MUST use a DIFFERENT | |
// client instance. You can NOT share them. | |
this.client = new kafka.Client(this.options.client.connectionString, | |
this.options.client.clientId, | |
this.options.client); | |
// The HighLevelConsumer is BROKEN. So we just use Consumer. | |
// https://github.com/SOHU-Co/kafka-node#highlevelconsumer | |
this.consumer = new kafka.Consumer(this.client, [{ | |
topic: this.options.consumer.topic, | |
partition: this.options.consumer.partition, | |
offset: this.options.consumer.offset | |
}], this.options.consumer); | |
this.consumer.on('error', function logKafkaConsumerErrors(err) { | |
winston.error('Kafka consumer error: ' + err + '. Consumer: ' + this.toString(true)); | |
}.bind(this)); | |
if (this.options.logSilly) { | |
this.consumer.on('message', function(msg) { | |
console.log('Kafka consumer silly logging', JSON.stringify(msg, null, 2)); | |
}); | |
} | |
} | |
Consumer.key = function key(topic, groupId, partition) { | |
var delim = '|'; | |
return topic + | |
delim + | |
groupId + | |
delim + | |
partition; | |
}; | |
Consumer.prototype.toString = function toString(pretty) { | |
// So options are not supposed to be modified. | |
// Someone could modify them. | |
// That would be evil. | |
var prefix = 'Kafka consumer ' + this.key + ' '; | |
if (!this.stringified) { | |
this.stringifiedPretty = prefix + JSON.stringify(this.options, null, 2); | |
this.stringified = prefix + JSON.stringify(this.options); | |
} | |
if (pretty) { | |
return this.stringifiedPretty; | |
} else { | |
return this.stringified; | |
} | |
}; | |
// Our primary responsibility is to load up the consumers and allow | |
// clients to register on message callbacks for consumers. | |
// | |
// This may need methods like close all, add, remove, etc. | |
function Registry(configs) { | |
this.configs = configs || []; | |
this.consumers = {}; | |
this.configs.forEach(function(cfg) { | |
var consumer = new Consumer(cfg); | |
this.consumers[consumer.key] = consumer; | |
}.bind(this)); | |
} | |
// Closes all the consumers so program can exit gracefully. | |
Registry.prototype.close = function close(force) { | |
this.listConsumers().forEach(function(c) { | |
c.consumer.close(force); | |
}); | |
}; | |
// Returns array of all consumers for the given topic and groupId. | |
// Or if no params passed returns all consumers. | |
Registry.prototype.listConsumers = function listConsumers(topic, groupId) { | |
var l = []; | |
if (!topic) { | |
// Return entire list | |
for (var key in this.consumers) { | |
if (this.consumers.hasOwnProperty(key)) { | |
l.push(this.consumers[key]); | |
} | |
} | |
} else { | |
// Return only consumers that match topic and groupId | |
this.listConsumers().forEach(function(c) { | |
if (c.options.consumer.topic === topic && c.options.consumer.groupId === groupId) { | |
l.push(c); | |
} | |
}); | |
} | |
return l; | |
}; | |
// Registers the given function with all consumers of given topic and groupId. | |
// | |
// Since not all consumers will be configured to run, we need to allow | |
// onMessage for consumers that don't exist. | |
Registry.prototype.onMessage = function onMessage(topic, groupId, fn) { | |
var consumers = this.listConsumers(topic, groupId); | |
if (consumers.length === 0) { | |
winston.info('No kafka consumer registered for topic, ' + topic + ', groupId, ' + groupId); | |
} | |
consumers.forEach(function(consumer) { | |
winston.info('Registering kafka handler for topic, ' + topic + ', groupId, ' + groupId); | |
consumer.consumer.on('message', fn); | |
}); | |
}; | |
// module.exports = Consumer; | |
module.exports = Registry; | |
// If we are run from command line you can play around with this code. | |
// Won't do anything if we are required as a module. | |
if (!module.parent) { | |
var reg = new Registry([{ | |
logSilly: true, | |
consumer: { | |
topic: 'job_result', | |
groupId: 'job_result' | |
} | |
}]); | |
reg.onMessage('job_result', 'job_result', function(msg) { | |
console.log('HERE is a REG message handler', msg); | |
}); | |
setTimeout(function() { | |
reg.close(); | |
}, 5000); | |
// var consumer = new Consumer({ | |
// logSilly: true, | |
// consumer: { | |
// topic: 'job_result', | |
// groupId: 'job_result' | |
// }, | |
// client: { | |
// clientId: 'freddy' | |
// } | |
// }); | |
// | |
// setTimeout(function() { | |
// consumer.consumer.close(); | |
// }, 5000); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment