|
/* |
|
* moleculer |
|
* Copyright (c) 2022 MoleculerJS (https://github.com/moleculerjs/moleculer) |
|
* MIT Licensed |
|
*/ |
|
|
|
"use strict"; |
|
|
|
const BaseLogger = require("moleculer").Loggers.Base; |
|
const { register } = require("moleculer").Loggers; |
|
const _ = require("lodash"); |
|
const fetch = require("node-fetch"); |
|
fetch.Promise = Promise; |
|
|
|
const util = require("util"); |
|
|
|
/** |
|
* Loki logger for Moleculer |
|
* |
|
* @class LokiLogger |
|
* @extends {BaseLogger} |
|
*/ |
|
class LokiLogger extends BaseLogger { |
|
/** |
|
* Creates an instance of LokiLogger. |
|
* |
|
* @param {Object} opts |
|
* @memberof LokiLogger |
|
*/ |
|
constructor(opts) { |
|
super(opts); |
|
|
|
this.opts = _.defaultsDeep(this.opts, { |
|
url: process.env.LOKI_URL || "https://localhost:3100", |
|
username: process.env.LOKI_USERNAME, |
|
password: process.env.LOKI_PASSWORD, |
|
defaultLabels: null, |
|
objectPrinter: null, |
|
interval: 10 * 1000, |
|
maxRows: 100 |
|
}); |
|
|
|
this.queue = []; |
|
this.timer = null; |
|
} |
|
|
|
/** |
|
* Initialize logger. |
|
* |
|
* @param {LoggerFactory} loggerFactory |
|
*/ |
|
init(loggerFactory) { |
|
super.init(loggerFactory); |
|
|
|
this.objectPrinter = this.opts.objectPrinter |
|
? this.opts.objectPrinter |
|
: o => |
|
util.inspect(o, { |
|
showHidden: false, |
|
depth: 2, |
|
colors: false, |
|
breakLength: Number.POSITIVE_INFINITY |
|
}); |
|
|
|
if (this.opts.interval > 0) { |
|
this.timer = setInterval(() => this.flush(), this.opts.interval); |
|
this.timer.unref(); |
|
} |
|
} |
|
|
|
/** |
|
* Stopping logger |
|
*/ |
|
stop() { |
|
if (this.timer) { |
|
clearInterval(this.timer); |
|
this.timer = null; |
|
} |
|
|
|
return this.flush(); |
|
} |
|
|
|
/** |
|
* Generate a new log handler. |
|
* |
|
* @param {object} bindings |
|
*/ |
|
getLogHandler(bindings) { |
|
let level = bindings ? this.getLogLevel(bindings.mod) : null; |
|
if (!level) return null; |
|
|
|
const printArgs = args => { |
|
return args.map(p => { |
|
if (_.isObject(p) || Array.isArray(p)) return this.objectPrinter(p); |
|
return p; |
|
}); |
|
}; |
|
const levelIdx = BaseLogger.LEVELS.indexOf(level); |
|
|
|
return (type, args) => { |
|
const typeIdx = BaseLogger.LEVELS.indexOf(type); |
|
if (typeIdx > levelIdx) return; |
|
|
|
this.queue.push({ |
|
ts: Date.now(), |
|
level: type, |
|
msg: printArgs(args).join(" "), |
|
bindings |
|
}); |
|
|
|
if (this.opts.maxRows > 0 && this.queue.length >= this.opts.maxRows) { |
|
this.flush(); |
|
} |
|
}; |
|
} |
|
|
|
convertLogLevel(level) { |
|
switch (level) { |
|
case "fatal": |
|
return "critical"; |
|
case "warn": |
|
return "warning"; |
|
case "trace": |
|
return "debug"; |
|
default: |
|
return level; |
|
} |
|
} |
|
|
|
getLabels(row) { |
|
const labels = { |
|
...(this.opts.defaultLabels ? this.opts.defaultLabels : {}), |
|
level: this.convertLogLevel(row.level), |
|
nodeID: row.bindings.nodeID, |
|
namespace: row.bindings.ns, |
|
module: row.bindings.mod |
|
}; |
|
|
|
if (row.bindings.svc) labels.service = row.bindings.svc; |
|
|
|
return labels; |
|
} |
|
|
|
/** |
|
* Flush queued log entries to Grafana Loki. |
|
*/ |
|
flush() { |
|
if (this.queue.length > 0) { |
|
const rows = Array.from(this.queue); |
|
this.queue.length = 0; |
|
|
|
const streamsByLevel = {}; |
|
|
|
rows.forEach(row => { |
|
let stream = streamsByLevel[row.level]; |
|
if (!stream) { |
|
streamsByLevel[row.level] = { |
|
stream: this.getLabels(row), |
|
values: [] |
|
}; |
|
stream = streamsByLevel[row.level]; |
|
} |
|
|
|
stream.values.push([String(row.ts * 1000000), row.msg]); |
|
}); |
|
|
|
const payload = { |
|
streams: Object.values(streamsByLevel) |
|
}; |
|
|
|
return fetch(this.opts.url + "/loki/api/v1/push", { |
|
method: "post", |
|
body: JSON.stringify(payload), |
|
headers: { |
|
"Content-Type": "application/json", |
|
Authorization: this.opts.username |
|
? "Basic " + |
|
Buffer.from(this.opts.username + ":" + this.opts.password).toString( |
|
"base64" |
|
) |
|
: undefined |
|
} |
|
}) |
|
.then(res => { |
|
//console.debug(`Logs (${rows.length}) are uploaded to Loki. Status:`,res.statusText); |
|
}) |
|
.catch(err => { |
|
console.warn("Unable to upload logs to Loki server. Error:" + err.message, err); |
|
}); |
|
} |
|
|
|
return this.broker.Promise.resolve(); |
|
} |
|
} |
|
|
|
register("Loki", LokiLogger); |
|
|
|
module.exports = LokiLogger; |