Created
October 3, 2018 23:48
-
-
Save Pchelolo/0203c271fe97e270e40195bc415cd22d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const { EventEmitter } = require('events'); | |
const P = require('bluebird'); | |
/** | |
* Probability of committing the offset in case the particular message | |
* did not match the rule. For some rules like null edits in change-prop | |
* the rule match is very unprobably, because they reuse a common topic | |
* with many other rules that fire way more frequently. | |
* | |
* If we only commit messages when they match, that can create a large | |
* false backlog - after a restart a lot of messages can be reread and | |
* continuosly rejected. | |
* | |
* To avoid it, commit offsets of the rejected messages from time to time. | |
* | |
* @const | |
* @type {number} | |
*/ | |
const NO_MATCH_COMMIT_PROBABILITY = 0.01; | |
class Event { | |
constructor(message, eventStream) { | |
this._eventStream = eventStream; | |
} | |
ack() { | |
// We're pushing it to pending messages only if it matched so that items | |
// that don't match don't count against the concurrency limit. | |
this._eventStream._pendingMsgs.add(msg); | |
} | |
commit() { | |
this._eventStream._notifyFinished(msg); | |
if (this._eventStream._pendingMsgs.size < this._eventStream.concurrency | |
&& !this._eventStream._consuming) { | |
this._eventStream._consume(); | |
} | |
} | |
reject() { | |
if (Math.random() < NO_MATCH_COMMIT_PROBABILITY) { | |
// Again, do not return the promise as the commit can be done async | |
this._eventStream._notifyFinished(msg); | |
} | |
} | |
} | |
class EventStream extends EventEmitter { | |
constructor(consumer) { | |
super(); | |
this._consumer = consumer; | |
// In order to filter out the pending messages faster make them offset->msg map | |
this._pendingMsgs = new Set(); | |
this._pendingCommits = new Map(); | |
this._consuming = false; | |
this._connected = false; | |
} | |
start() { | |
} | |
stop() { | |
} | |
_consume() { | |
if (!this._connected) { | |
return; | |
} | |
this._consuming = true; | |
this._consumer.consumeAsync(this.consumerBatchSize) | |
.then((messages) => { | |
if (!messages.length) { | |
// No new messages, delay a bit and try again. | |
return P.delay(100); | |
} | |
messages.forEach((msg) => { | |
const message = this._safeParse(msg.value.toString('utf8')); | |
this.emit('event', new Event(message, this)); | |
}); | |
}) | |
.catch((e) => { | |
// This errors must come from the KafkaConsumer | |
// since the actual handler must never throw errors | |
/* eslint-disable indent */ | |
switch (e.code) { | |
case kafka.CODES.ERRORS.ERR__PARTITION_EOF: | |
case kafka.CODES.ERRORS.ERR__TIMED_OUT: | |
// We're reading to fast, nothing is there, slow down a little bit | |
return P.delay(100); | |
default: | |
if (e.code === kafka.CODES.ERRORS.ERR__STATE) { | |
if (this._connected) { | |
// KafkaConsumer is disconnected or entered error state, | |
// but not because we stopped it. | |
// Give it some time to reconnect before the new attempt | |
// to fetch messages again to avoid a tight loop | |
this._logger.log(`error/consumer`, e); | |
return P.delay(1000); | |
} else { | |
return; | |
} | |
} | |
// Something else is terribly wrong. | |
this._logger.log(`error/consumer`, e); | |
} | |
/* eslint-enable indent */ | |
}) | |
.finally(() => { | |
if (this._pendingMsgs.size < this.concurrency) { | |
this._consume(); | |
} else { | |
this._consuming = false; | |
} | |
}); | |
} | |
_notifyFinished(finishedMsg) { | |
this._pendingMsgs.delete(finishedMsg); | |
if (this._pendingCommits.has(finishedMsg.topic)) { | |
this._pendingCommits.get(finishedMsg.topic).push(finishedMsg); | |
} else { | |
this._pendingCommits.set(finishedMsg.topic, [ finishedMsg ]); | |
} | |
if (this.options.test_mode) { | |
this._logger.log('trace/commit', 'Running in TEST MODE; Offset commits disabled'); | |
return; | |
} | |
if (!this._commitTimeout) { | |
this._commitTimeout = setTimeout(() => { | |
const stillHasPending = (proposedToCommit) => { | |
for (const pendingMsg of this._pendingMsgs.entries()) { | |
if (pendingMsg.topic === proposedToCommit.topic | |
&& pendingMsg.offset <= proposedToCommit.offset) { | |
return true; | |
} | |
} | |
return false; | |
}; | |
const toCommit = []; | |
this._commitTimeout = null; | |
if (!this._connected) { | |
return; | |
} | |
for (const [topic, commitQueue] of this._pendingCommits.entries()) { | |
if (commitQueue.length) { | |
let sortedCommitQueue = commitQueue.sort((msg1, msg2) => | |
msg1.offset - msg2.offset); | |
let msgToCommit; | |
while (sortedCommitQueue.length | |
&& !stillHasPending(sortedCommitQueue[0])) { | |
msgToCommit = sortedCommitQueue[0]; | |
sortedCommitQueue = sortedCommitQueue.slice(1); | |
} | |
if (msgToCommit) { | |
toCommit.push(msgToCommit); | |
this._pendingCommits.set(topic, sortedCommitQueue); | |
} | |
} | |
} | |
return P.all(toCommit.map(message => this._consumer.commitMessageAsync(message) | |
.catch((e) => { | |
this._logger.log(`error/commit`, () => ({ | |
msg: 'Commit failed', | |
offset: message.offset, | |
raw_event: message.value.toString(), | |
description: e.toString() | |
})); | |
}) | |
)); | |
}, DEFAULT_COMMIT_INTERVAL); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment