Skip to content

Instantly share code, notes, and snippets.

@intech
Created May 2, 2021 15:33
Show Gist options
  • Save intech/cef1c814341565ca5543dcd10828857d to your computer and use it in GitHub Desktop.
Save intech/cef1c814341565ca5543dcd10828857d to your computer and use it in GitHub Desktop.
Moleculer Middleware redis-smq
const { callbackify } = require("util");
const { GracefulStopTimeoutError } = require("moleculer").Errors;
const { Message, Producer, Consumer } = require("redis-smq");
const events = require("redis-smq/src/events");
module.exports = function QueueMiddleware() {
const producers = new Map();
const consumers = new Map();
function gracefulShutdown(broker, items) {
if (!items || !items.size) return broker.Promise.resolve();
return new broker.Promise(resolve => {
let timedOut = false;
const timeout = setTimeout(() => {
timedOut = true;
broker.logger.error(new GracefulStopTimeoutError({
items: items.keys()
}));
resolve();
}, 5e3);
let first = true;
const checkForDown = () => {
broker.logger.info(`🦄 Queue items down ${items.size}`);
if (items.size) {
clearTimeout(timeout);
resolve();
} else {
if (first) {
broker.logger.warn(
`🦄 Queue waiting for ${items.size} running item(s)...`
);
first = false;
}
if (!timedOut) setTimeout(checkForDown, 100);
}
};
setImmediate(checkForDown);
});
}
function getQueueName(schema) {
if (schema.version != null && !(schema.settings || {
}).$noVersionPrefix) {
return formatQueueName(`v${schema.version}.${schema.name}`);
}
return formatQueueName(schema.name);
}
function formatQueueName(queue) {
return queue.toLowerCase().replace(/[^a-z0-9_-]/g, "-");
}
return {
name: "Queue",
serviceCreating(service, schema) {
if (schema.queues) {
for(let [name, handler] of Object.entries(schema.queues)) {
let options = {
};
if(typeof handler === "object") {
handler = handler.handler;
options = handler.options;
} else if(typeof handler !== "function") throw new Error(`Required handler for queue '${name}'`);
Consumer.queueName = `${getQueueName(schema)}-${name}`;
const consumer = new Consumer(service.broker.options.queue, options);
consumer.consume = callbackify(handler).bind(service);
consumer.on(events.DOWN, () => consumers.delete(name));
consumers.set(Consumer.queueName, consumer);
consumer.run();
service.broker.logger.info(
`🦄 Queue consumer ${name} in service ${Consumer.queueName}`
);
}
service.broker.logger.info(
`🦄 Queue consumer registers ${Object.keys(schema.queues).length}`
);
}
},
// After the broker created
created(broker) {
class Job extends Message {
constructor(queue) {
super();
this.queue = formatQueueName(queue);
}
async produce() {
return new broker.Promise((resolve, reject) => {
let producer = null;
if (!producers.has(this.queue)) {
producer = new Producer(this.queue, broker.options.queue);
producer.on(events.DOWN, () => producers.delete(this.queue));
producers.set(this.queue, producer);
} else producer = producers.get(this.queue);
producer.produceMessage(this, err => (err ? reject(err) : resolve()));
});
}
}
if(!broker.options.queue) broker.options.queue = {
};
broker.options.queue.namespace = broker.options.namespace;
if(!Array.isArray(broker.options.replCommands)) broker.options.replCommands = [];
broker.options.replCommands.push({
command: "queue",
alias: "job",
description: "Add new job",
options: [
{
option: "-n, --queue <queueName>", description: "Queue name"
},
{
option: "-b, --body <body>", description: "Job body"
}
],
types: {
//number: ["service"]
},
async action(broker, args) {
const { options } = args;
console.log(options);
const job = broker.queue.newJob(options.queue);
let payload = options.body;
if (typeof(payload) == "string") payload = JSON.parse(payload);
if(payload) job.setBody(payload);
const startTime = process.hrtime();
const result = await job.produce();
const diff = process.hrtime(startTime);
const duration = (diff[0] + diff[1] / 1e9) * 1000;
broker.logger.info(`Done ${options.queue}:${job.getId()}, duration: ${duration}ms`);
broker.logger.info(job.toString(), result);
}
});
broker.logger.info("🦄 Queue middleware ready");
broker.queue = {
newJob: (queue) => new Job(queue)
};
},
started(broker) {
this.logger.info(broker.options.queue);
},
// Before broker stopping
stopping(broker) {
return Promise.all([
gracefulShutdown(broker, producers),
gracefulShutdown(broker, consumers)
]);
}
};
};
@intech
Copy link
Author

intech commented Feb 3, 2022

@weyoss hey! Thanks for the reply. I'll check the new version!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment