Skip to content

Instantly share code, notes, and snippets.

@intech
Last active February 19, 2023 16:57
Show Gist options
  • Save intech/b6e809c729835cb1da6411c4a940846b to your computer and use it in GitHub Desktop.
Save intech/b6e809c729835cb1da6411c4a940846b to your computer and use it in GitHub Desktop.
Moleculer middleware Knex
const Knex = require("knex");
const { MoleculerServerError } = require("moleculer").Errors;
// const saveAsyncStack = require('knex/lib/util/save-async-stack');
const { types } = require("pg");
types.setTypeParser(20, (value) => +value);
types.setTypeParser(1700, (value) => +value);
module.exports = function KnexMiddleware(options) {
options = {
client: "pg",
connection: process.env.DB,
searchPath: ["public"],
asyncStackTraces: true,
pool: {
max: 10,
min: 2,
acquireTimeoutMillis: 5000,
createTimeoutMillis: 5000,
destroyTimeoutMillis: 5000,
idleTimeoutMillis: 30000,
reapIntervalMillis: 1000,
createRetryIntervalMillis: 200,
log: true,
},
acquireConnectionTimeout: 10000,
...options,
};
const connection = new Knex(options);
/*
const camelCase = require('lodash/camelCase');
const isArray = require('lodash/isArray');
const isObject = require('lodash/isObject');
const mapKeys = require('lodash/mapKeys');
// snake_case to camelCase
const toCamelCase = (value) => camelCase(value)
const camelizeKeys = (object) => mapKeys(object, (value, key) => toCamelCase(key))
const postProcessRow = (row) => isObject(row) ? camelizeKeys(row) : row
const postProcessResponse = (result, queryContext) => {
if (!result) return
return isArray(result) ? result.map(postProcessRow) : postProcessRow(result)
}
// camelCase to snake_case
const toSnakeCase = (value) => underscored(value)
const wrapIdentifier = (value, origImpl, queryContext) => {
if (value !== '*') value = toSnakeCase(value)
return origImpl(value)
}
*/
// connection.client.config.postProcessResponse = (response, queryContext) => {
// console.log("postProcessResponse:", response, queryContext);
// // const tags = { method: query.method, uuid: query.__knexUid };
// // if(isTracing && tracers[tags.uuid]) {
// if(isTracing && queryContext && queryContext.tracing) {
// tracers[queryContext.uuid] = undefined;
// queryContext.tracing.finish();
// }
// return response;
// };
//
// connection.client.config.wrapIdentifier = (id, origImpl) => {
// console.log("wrapIdentifier:", id);
// return origImpl(id);
// };
function wrapQuery(next) {
return function (ctx) {
ctx.crdb = (q, options) => connection(q, options).queryContext({
ctx,
});
ctx.transaction = (trx) => connection.client.transaction(trx, {
ctx,
});
return next(ctx);
};
}
return {
name: "Knex",
async created(broker) {
const isMetrics = broker.isMetricsEnabled();
const isTracing = broker.isTracingEnabled();
(function (runner) {
connection.client.runner = function () {
const _runner = runner.apply(this, arguments);
(function (run) {
_runner.run = function () {
let span; let
metric;
const args = arguments;
if (isTracing) {
const context = this.builder.queryContext();
let { tracer } = broker;
if (context && "ctx" in context) tracer = context.ctx.span;
span = tracer.startSpan(`db.query '${this.builder.toString()}'`, {
type: "query",
});
}
if (isMetrics) {
broker.metrics.increment("db.query.total");
broker.metrics.increment("db.query.active");
metric = broker.metrics.timer("db.query.time", {
method: this.builder._method,
query: "query",
});
}
const res = run.apply(this, args);
return res.catch((e) => {
if (isTracing && span) { span.setError(e); }
broker.logger.error(e);
throw new MoleculerServerError("Database query error", e.code);
}).finally(() => {
if (isTracing && span) { span.finish(); }
if (isMetrics) {
if (metric) metric();
broker.metrics.decrement("db.query.active");
}
});
};
}(_runner.run));
return _runner;
};
}(connection.client.runner));
(function (transaction) {
connection.client.transaction = function () {
let span; let
metric;
if (isTracing) {
const context = arguments[1];
let { tracer } = broker;
if (context && "ctx" in context) tracer = context.ctx.span;
span = tracer.startSpan(`db.transaction '${(new Error().stack.split("at ")[5]).trim()}'`, {
type: "tx",
});
}
if (isMetrics) {
broker.metrics.increment("db.query.total");
broker.metrics.increment("db.query.active");
metric = broker.metrics.timer("db.query.time", {
query: "transaction",
});
}
const res = transaction.apply(this, arguments);
return res.catch((e) => {
if (isTracing && span) { span.setError(e); }
broker.logger.error(e);
throw new MoleculerServerError("Database query error", e.code);
}).finally(() => {
if (isTracing && span) { span.finish(); }
if (isMetrics) {
if (metric) metric();
broker.metrics.decrement("db.query.active");
}
});
};
}(connection.client.transaction));
this.connection = connection;
if (isMetrics) {
broker.metrics.register({
name: "db.query.total",
type: "counter",
unit: "query",
description: "Number of queries",
rate: true,
});
broker.metrics.register({
name: "db.query.errors",
type: "counter",
unit: "query",
description: "Number of error queries",
rate: true,
});
broker.metrics.register({
name: "db.query.active",
type: "gauge",
unit: "query",
description: "Number of active queries",
});
broker.metrics.register({
name: "db.query.time",
type: "histogram",
labelNames: ["query"],
quantiles: true,
buckets: true,
unit: "milliseconds",
description: "Query times in milliseconds",
rate: true,
});
}
},
localAction: wrapQuery,
localEvent: wrapQuery,
async starting() {
try {
await this.connection.queryBuilder().select([1]);
this.logger.info("Connection has been established successfully.");
} catch (error) {
this.logger.fatal(error);
}
this.logger.info("Knex middleware ready");
},
async stopping() {
if (this.connection) await this.connection.destroy();
this.logger.info("Knex middleware close");
},
serviceCreating(service, schema) {
if (!schema.name.startsWith("$")) {
service.crdb = this.connection;
this.logger.debug(`Knex for \`${schema.name}\` ready`);
}
},
};
};
// ...
const Knex = require("./middlewares/knex");
// ...
module.exports = {
// ...
middlewares: [Knex()]
// ...
};
module.exports = {
name: "test",
actions: {
test: {
rest: "GET test",
async handler(ctx) {
// use knex query builder
return ctx.knex("table").sum("num as sum");
}
}
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment