Last active
July 27, 2022 12:51
-
-
Save intech/52b82fca6d2c5f47d7cb4cfa2d3f3c2e to your computer and use it in GitHub Desktop.
Moleculer ORM Objection.js (https://github.com/vincit/objection.js) middleware by pattern Shared database (https://microservices.io/patterns/data/shared-database.html)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// file: models/Base.js | |
const { Model } = require("objection"); | |
module.exports = class BaseModel extends Model { | |
static get modelPaths() { | |
return [__dirname]; | |
} | |
static get useLimitInFirst() { | |
return true; | |
} | |
$beforeInsert() { | |
const { timestamp } = this.constructor; | |
if (timestamp && timestamp.created && this[timestamp.created] === undefined) | |
this[timestamp.created] = new Date().toISOString(); | |
} | |
$beforeUpdate() { | |
const { timestamp } = this.constructor; | |
if (timestamp && timestamp.updated && this[timestamp.updated] === undefined) | |
this[timestamp.updated] = new Date().toISOString(); | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// file: middlewares/objection.middleware.js | |
const glob = require("glob").sync; | |
const Knex = require("knex"); | |
const { Model, initialize } = require("objection"); | |
const profilerSymbol = Symbol("profiler"); | |
module.exports = function KnexMiddleware(options) { | |
const storage = new AsyncLocalStorage(); | |
const connection = new Knex({ | |
client: "postgres", | |
connection: process.env.DB, | |
asyncStackTraces: true, | |
pool: { | |
max: 10, | |
min: 1, | |
acquireTimeoutMillis: 5000, | |
createTimeoutMillis: 5000, | |
destroyTimeoutMillis: 5000, | |
idleTimeoutMillis: 5000, | |
reapIntervalMillis: 1000, | |
createRetryIntervalMillis: 200, | |
propagateCreateError: false | |
}, | |
acquireConnectionTimeout: 10000, | |
...options | |
}); | |
let models = {}; | |
let logger = null; | |
const regexp = /(\$\d+),[\s\t\n]+?/gm; | |
return { | |
name: "Objection", | |
isTracing: false, | |
isMetrics: false, | |
[profilerSymbol]: null, | |
async created(broker) { | |
logger = broker.getLogger("objection"); | |
this.isMetrics = broker.isMetricsEnabled(); | |
this.isTracing = broker.isTracingEnabled(); | |
this[profilerSymbol] = data => { | |
const ctx = storage.getStore() || {}; | |
if (this.isTracing) { | |
const tracer = ctx.span ? ctx.span : broker.tracer; | |
const type = data.__knexTxId ? `db.${data.__knexTxId}` : "db.query"; | |
const sql = data.sql.replace(regexp, ""); | |
ctx._tracer = tracer.startSpan(`${type} '${sql}'`, { | |
type: data.__knexTxId ? "transaction" : "query" | |
}); | |
} | |
if (this.isMetrics) { | |
broker.metrics.increment("db.query.total"); | |
broker.metrics.increment("db.query.active"); | |
// create timer instance for metric query time | |
ctx._metric = broker.metrics.timer("db.query.time", { | |
method: data.method, | |
query: "query" | |
}); | |
} | |
storage.enterWith(ctx); | |
}; | |
if (this.isMetrics) { | |
broker.metrics.register({ | |
name: "db.query.total", | |
type: "counter", | |
unit: "query", | |
description: "Number of queries", | |
rate: true | |
}); | |
broker.metrics.register({ | |
name: "db.query.errors", | |
type: "counter", | |
unit: "query", | |
description: "Number of error queries", | |
rate: true | |
}); | |
broker.metrics.register({ | |
name: "db.query.active", | |
type: "gauge", | |
unit: "query", | |
description: "Number of active queries" | |
}); | |
broker.metrics.register({ | |
name: "db.query.time", | |
type: "histogram", | |
labelNames: ["query"], | |
quantiles: true, | |
buckets: true, | |
unit: "milliseconds", | |
description: "Query times in milliseconds", | |
rate: true | |
}); | |
} | |
connection.prependListener("query", data => this[profilerSymbol](data)); | |
connection.on("query-response", () => { | |
const ctx = storage.getStore(); | |
if (this.isTracing && ctx._tracer) ctx._tracer.finish(); | |
if (this.isMetrics) { | |
if (ctx._metric) ctx._metric(); | |
broker.metrics.decrement("db.query.active"); | |
} | |
}); | |
connection.on("query-error", (error, obj) => { | |
logger.debug("trace error:", obj.sql); | |
// if("originalStack" in e) { | |
// e.stack = e.originalStack; | |
// console.trace(e.originalStack); | |
// } | |
const ctx = storage.getStore(); | |
if (this.isTracing && ctx._tracer) { | |
ctx._tracer.setError(error); | |
} | |
// logger.error(error.toString()); | |
}); | |
models = glob(__dirname + "/../models/**/*.js", { absolute: true }); | |
if (!models.length) { | |
logger.warn("There is no files in directory models"); | |
} | |
Model.knex(connection); | |
models = Object.fromEntries( | |
models.map(model => { | |
const m = require(model); | |
return [m.name, m]; | |
}) | |
); | |
logger.info("Objection middleware ready"); | |
}, | |
localAction(next) { | |
return ctx => { | |
const { span } = ctx; | |
return storage.run({ span }, () => next(ctx)); | |
}; | |
}, | |
localEvent(next) { | |
return ctx => { | |
const { span } = ctx; | |
return storage.run({ span }, () => next(ctx)); | |
}; | |
}, | |
async starting() { | |
if (Object.values(models).length) { | |
await initialize(connection, Object.values(models)); | |
} else { | |
await connection.queryBuilder().select(1); | |
} | |
logger.info("Connection has been established successfully."); | |
}, | |
async stopping() { | |
if (connection) await connection.destroy(); | |
logger.info("Objection middleware close"); | |
}, | |
serviceCreating(service, schema) { | |
if (!schema.name.startsWith("$")) { | |
service.models = models; | |
logger.debug(`Objection for \`${schema.name}\` ready`); | |
} | |
} | |
}; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment