Created
September 16, 2017 15:47
-
-
Save iamjochem/c6d74c4ed916acab56973d7b8f0a0697 to your computer and use it in GitHub Desktop.
[NodeJS:bunyan,rabbitmq] a custom bunyan stream for publishing logs messages to amq/rabbitmq
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Bunyan AMQP Stream Transport for ES6 | |
'use strict'; | |
// 3rd party deps | |
const AMQP = require('amqplib/callback_api'); | |
const stringify = require('json-stringify-safe'); | |
const mydebug = require('debug')('bunyan:amq'); | |
// our deps | |
const genTopic = require('./gentopic'); // used to generate a RabbitMQ topic/routing-key | |
const loglevels = require('./log_levels'); // used to translates Bunyan integer to levels to strings to be used as AMQP topics | |
// locals | |
const trueP = Promise.resolve(true); | |
const falseP = Promise.resolve(false); | |
const MAX_Q_LEN = 50000; // max length of unpublished messages queue () | |
// expose the module! | |
module.exports = (options) => new BunyanAMQPTransport(options); | |
class BunyanAMQPTransport | |
{ | |
constructor(options) | |
{ | |
this.total_received = 0; | |
this.total_published = 0; | |
this.unpublished = []; | |
this.exchange = options.exchange ? options.exchange : 'bunyan_logs'; | |
/** | |
* getch() : get channel | |
* | |
* @return {Promise} - promise for a channel object | |
*/ | |
this.getch = ((dns) => { | |
let conn_to, conref, chnref, prom, reconnect_cnt = 0, shutting_down = false; | |
const myself = this; | |
function close(shutdown_fn) | |
{ | |
if (myself.running || myself.flooded || myself.unpublished.length) { | |
mydebug('delaying closing connection, messages still in-flight'); | |
return setTimeout(() => close(shutdown_fn), 250); | |
} | |
const done = typeof shutdown_fn === 'function' ? shutdown_fn : () => {}; | |
const doDone = () => { mydebug(`received ${myself.total_received} & published ${myself.total_published} log messages during process life-time`); done(); }; | |
const doCloseCon = () => (conref ? conref.close(doDone) : doDone()); | |
const doClose = () => (chnref ? chnref.close(doCloseCon) : doCloseCon()); | |
if (myself.last_flush) { | |
const offset = Math.max(myself.last_flush_count, 20) * 5, | |
now = +new Date | |
; | |
// mydebug(`last_flush = ${myself.last_flush}, now = ${now}, offset = ${offset}, myself.last_flush < now - offset = ${myself.last_flush < now - offset}`); | |
if (myself.last_flush > now - offset) { | |
mydebug(myself.last_flush_count ? `last flushed ${myself.last_flush_count} log messages ${now - myself.last_flush} seconds ago ... waiting ${offset} ms before closing AMQ connection.` | |
: `last flushed log messages ${now - myself.last_flush} seconds ago ... waiting ${offset} ms before closing AMQ connection`); | |
return setTimeout(() => doClose(), offset); | |
} | |
} | |
doClose(); | |
} | |
function reset(err) | |
{ | |
// TODO: better way to handle potential error in this context? | |
// (bare in mind this is an error inside of a logger stream transport!?) | |
if (err && !options.ignore_errors) | |
console.error(err); // eslint-disable-line no-console | |
close(); | |
if (shutting_down) | |
return; // no reseting of connection ref is shutting down - we want to possible hang 'destroy' event handlers on it | |
conref = null; | |
chnref = null; | |
prom = null; | |
reconnect_cnt += 1; | |
} | |
function reset_throw(err) | |
{ | |
reset(err); throw err; | |
} | |
return (shutdown) => { | |
if (shutdown && !shutting_down) { | |
shutting_down = true; | |
close(shutdown); | |
} | |
if (shutting_down) | |
return Promise.resolve( chnref ); | |
if (!prom) { | |
prom = new Promise((resolve, reject) => { | |
if (conn_to) | |
clearTimeout(conn_to); | |
conn_to = setTimeout(() => { | |
AMQP.connect(dns, {}, (e, o) => { if (e) reject(e); else resolve(o); }); | |
}, Math.min(5000, reconnect_cnt * 1000)); | |
}); | |
prom = prom .then ( con => (conref = con).createChannel() ) | |
.then ( chn => (chnref = chn).assertExchange(this.exchange, 'topic', { durable: true }) ) | |
.then ( () => reconnect_cnt = 0 ) | |
.then ( () => { | |
const block = () => this.flooded = true; | |
const drain = () => this.flush(chnref, true); | |
conref.on('error', reset); | |
chnref.on('error', reset); | |
chnref.on('drain', drain); | |
// RabbitMQ specific extension (see here: http://www.rabbitmq.com/connection-blocked.html) | |
chnref.on('blocked', block); | |
chnref.on('unblocked', drain); | |
}) | |
.then ( () => chnref ) | |
.catch( reset_throw ); | |
} | |
return prom; | |
}; | |
})(genDSN(options)); | |
} | |
flush(channel, drained) | |
{ | |
if (!channel) | |
return falseP; | |
if (this.running) | |
return falseP; | |
if (drained === true) { | |
this.last_flush_count = null; | |
this.flooded = false; | |
} | |
if (this.flooded || !this.unpublished.length) | |
return falseP; | |
if (this.unpublished.length > MAX_Q_LEN) { | |
mydebug(`max queue length exceeded, dropping ${this.unpublished.length - MAX_Q_LEN} message(s).`); | |
this.unpublished = this.unpublished.slice(0, MAX_Q_LEN); | |
} | |
this.running = true; | |
{ | |
const c = this.unpublished.length, | |
m = this.unpublished[c - 1], | |
r = channel.publish(this.exchange, m[0], m[1], { contentType : 'application/json' }) | |
; | |
this.running = false; | |
this.last_flush = +new Date; | |
if (r === false) { | |
this.flooded = true; | |
return falseP; | |
} | |
this.total_published += 1; | |
this.unpublished.pop(); | |
if (this.unpublished.length) | |
return this.flush(channel).then(ok => { | |
this.last_flush_count = c; | |
return ok; | |
}); | |
return trueP; | |
} | |
} | |
write(message) | |
{ | |
this.total_received += 1; | |
const topic = genTopic(loglevels.nearest_lvlstr(message.level), message.name, message.hostname); | |
// TODO: a better ring buffer? | |
this.unpublished.unshift([ topic, new Buffer(stringify(message, null, null)) ]); | |
this.getch().then ( (c) => this.flush(c) ) // eslint-disable-next-line no-console | |
.catch( (e) => console.log('Failed to publish application log message to "%s" on AMQ exchange "%s"', topic, this.exchange, e) ) | |
; | |
} | |
close(done) | |
{ | |
// at this stage we must swallow the error (alternative is to loop forever - which is not very helpful either) | |
const finish = (err) => { | |
if (this.unpublished.length) | |
this.unpublished.forEach(m => { | |
const s = m[1]; | |
// ignore unpublished info messages | |
if (s.level === 30) // eslint-disable-next-line no-console | |
console.error(s); | |
}); | |
if (err) // eslint-disable-next-line no-console | |
console.error(err); | |
this.getch(done).catch(done); | |
}; | |
return this.getch().then((c) => { | |
if (!this.unpublished.length) | |
return finish(); | |
return this.flush(c).then(function(ok) { | |
if (ok) | |
throw new Error('failed to [completely] flush AMQ logging-connection, waiting a little while to try one more time.'); | |
return finish(); | |
}).catch(() => { | |
setTimeout(() => { | |
if (!c) | |
return finish(); | |
this.flush(c).then(finish).catch(finish); | |
}, 1200).unref(); | |
}); | |
}); | |
} | |
} | |
function genDSN(opts) | |
{ | |
let dsn; | |
const options = opts || {}; | |
if (options.dsn) { | |
dsn = options.dsn; | |
} else { | |
const usr = options.username || 'guest'; | |
const pwd = options.password || 'guest'; | |
const host = options.host ? options.host : '127.0.0.1'; | |
const port = options.port ? options.port : 5672; | |
const vhost = options.vhost ? '/' + options.vhost : ''; | |
dsn = `amqp://${usr}:${pwd}@${host}:${port}${vhost}`; | |
} | |
return dsn; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = { | |
// separator char for amq topic/routing-key segments | |
TOPIC_SEGMENT_SEPARATOR : '.', | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// 3rd party deps | |
const isarr = require('lodash/isArray'); | |
const isbool = require('lodash/isBoolean'); | |
const isnum = require('lodash/isNumber'); | |
// our deps | |
const constants = require('./constants'); | |
// locals | |
const host_name = require('os').hostname(); | |
/** | |
* predefined filtering function for topic-part arrays, | |
* filters empty values | |
* | |
* @type {Function} | |
*/ | |
const filter_tparts = require('lodash/partialRight')(require('lodash/filter'), require('lodash/trim')); | |
/** | |
* cleans an individual topic segment, to ensure that | |
* the segment separator char is replaced in each topic segment value | |
* and the value is always lower case | |
* | |
* @type {Function} | |
*/ | |
const tpart_clean = (s) => s.toLowerCase().replace(constants.TOPIC_SEGMENT_SEPARATOR, ''); | |
/** | |
* helper for build a topic string from an array of topic segments | |
* | |
* @param {Array} parts | |
* @return {String} | |
* @throws {Error} - throw is `parts` is not an array or contains less than 2 valid values | |
*/ | |
function build_topic_str(parts) | |
{ | |
let filtered; | |
if (!isarr(parts) || ((filtered = filter_tparts(parts)).length < 2)) | |
throw new Error(`Invalid topic parts given (${filtered.join(',')}), cannot generate a messaging topic string`); | |
return filtered.map( tpart_clean ).join(constants.TOPIC_SEGMENT_SEPARATOR); | |
} | |
module.exports = function(lvl, name, host, ...args) { | |
return build_topic_str([lvl || 'error', name, host || host_name, ...args]); | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// 3rd party deps | |
const invert = require('lodash/invert'); | |
const isstr = require('lodash/isString'); | |
const isfunc = require('lodash/isFunction'); | |
const isnum = require('lodash/isNumber'); | |
const clone = require('lodash/clone'); | |
const find = require('lodash/find'); | |
const keys = require('lodash/keys'); | |
const equals = require('lodash/eq'); | |
const negate = require('lodash/negate'); | |
const mapvals = require('lodash/mapValues'); | |
const comparators = [ | |
{ fn : equals, operators : ['eq', '=', '==', '==='] }, | |
{ fn : negate(equals), operators : ['neq', '!=', '!==', 'not'] }, | |
{ fn : require('lodash/lt'), operators : ['lt', '<'] }, | |
{ fn : require('lodash/lte'), operators : ['lte', '<='] }, | |
{ fn : require('lodash/gt'), operators : ['gt', '>'] }, | |
{ fn : require('lodash/gte'), operators : ['gte', '>='] }, | |
]; | |
// locals | |
const default_lvl = 'info'; | |
const level_map = { | |
10 : 'trace', | |
20 : 'debug', | |
30 : 'info', | |
40 : 'warn', | |
50 : 'error', | |
60 : 'fatal', | |
}; | |
const level_rmap = mapvals(invert(level_map), v => parseInt(v, 10)); | |
// export module! | |
module.exports = { | |
levels : reversed => clone(reversed ? level_rmap : level_map), | |
default_level : () => default_lvl, | |
nearest_lvlstr : v => itos(nearest_lvl(v)), | |
itos, | |
stoi, | |
compare, | |
nearest_lvl, | |
}; | |
function itos(v) | |
{ | |
return (isnum(v) && v >= 0 && level_map[ v + '' ]) || null; | |
} | |
function stoi(v) | |
{ | |
return isstr(v) && level_rmap.hasOwnProperty(v.toLowerCase()) ? level_rmap[ v.toLowerCase() ] : null; | |
} | |
function compare(a, b, comparator) | |
{ | |
const a_val = nearest_lvl(a), | |
b_val = nearest_lvl(b), | |
fn = determine_comparator_fn(comparator) | |
; | |
return isfunc(fn) ? fn.call(null, a_val, b_val) : false; | |
} | |
function determine_comparator_fn(comp) | |
{ | |
if (isfunc(comp)) | |
return comp; | |
if (isstr(comp)) { | |
const c = comp.trim().toLowerCase(); | |
let i = comparators.length; | |
while (i-- > -1) { | |
const item = comparators[ i ]; | |
if (item.operators.indexOf(c) !== -1) | |
return item.fn; | |
} | |
} | |
return equals; | |
} | |
function nearest_lvl(input) | |
{ | |
const v = isnum(input) ? input : stoi(input || default_lvl); | |
if (level_map[ v ]) | |
return v; | |
const lvl_vals = keys(level_map); | |
return parseInt(find(lvl_vals, (lvl, i, c) => (v <= lvl) || (i === c.length - 1)) || stoi(default_lvl), 10); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment