Created
May 2, 2021 15:33
-
-
Save intech/cef1c814341565ca5543dcd10828857d to your computer and use it in GitHub Desktop.
Moleculer Middleware redis-smq
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { callbackify } = require("util"); | |
const { GracefulStopTimeoutError } = require("moleculer").Errors; | |
const { Message, Producer, Consumer } = require("redis-smq"); | |
const events = require("redis-smq/src/events"); | |
module.exports = function QueueMiddleware() { | |
const producers = new Map(); | |
const consumers = new Map(); | |
function gracefulShutdown(broker, items) { | |
if (!items || !items.size) return broker.Promise.resolve(); | |
return new broker.Promise(resolve => { | |
let timedOut = false; | |
const timeout = setTimeout(() => { | |
timedOut = true; | |
broker.logger.error(new GracefulStopTimeoutError({ | |
items: items.keys() | |
})); | |
resolve(); | |
}, 5e3); | |
let first = true; | |
const checkForDown = () => { | |
broker.logger.info(`🦄 Queue items down ${items.size}`); | |
if (items.size) { | |
clearTimeout(timeout); | |
resolve(); | |
} else { | |
if (first) { | |
broker.logger.warn( | |
`🦄 Queue waiting for ${items.size} running item(s)...` | |
); | |
first = false; | |
} | |
if (!timedOut) setTimeout(checkForDown, 100); | |
} | |
}; | |
setImmediate(checkForDown); | |
}); | |
} | |
function getQueueName(schema) { | |
if (schema.version != null && !(schema.settings || { | |
}).$noVersionPrefix) { | |
return formatQueueName(`v${schema.version}.${schema.name}`); | |
} | |
return formatQueueName(schema.name); | |
} | |
function formatQueueName(queue) { | |
return queue.toLowerCase().replace(/[^a-z0-9_-]/g, "-"); | |
} | |
return { | |
name: "Queue", | |
serviceCreating(service, schema) { | |
if (schema.queues) { | |
for(let [name, handler] of Object.entries(schema.queues)) { | |
let options = { | |
}; | |
if(typeof handler === "object") { | |
handler = handler.handler; | |
options = handler.options; | |
} else if(typeof handler !== "function") throw new Error(`Required handler for queue '${name}'`); | |
Consumer.queueName = `${getQueueName(schema)}-${name}`; | |
const consumer = new Consumer(service.broker.options.queue, options); | |
consumer.consume = callbackify(handler).bind(service); | |
consumer.on(events.DOWN, () => consumers.delete(name)); | |
consumers.set(Consumer.queueName, consumer); | |
consumer.run(); | |
service.broker.logger.info( | |
`🦄 Queue consumer ${name} in service ${Consumer.queueName}` | |
); | |
} | |
service.broker.logger.info( | |
`🦄 Queue consumer registers ${Object.keys(schema.queues).length}` | |
); | |
} | |
}, | |
// After the broker created | |
created(broker) { | |
class Job extends Message { | |
constructor(queue) { | |
super(); | |
this.queue = formatQueueName(queue); | |
} | |
async produce() { | |
return new broker.Promise((resolve, reject) => { | |
let producer = null; | |
if (!producers.has(this.queue)) { | |
producer = new Producer(this.queue, broker.options.queue); | |
producer.on(events.DOWN, () => producers.delete(this.queue)); | |
producers.set(this.queue, producer); | |
} else producer = producers.get(this.queue); | |
producer.produceMessage(this, err => (err ? reject(err) : resolve())); | |
}); | |
} | |
} | |
if(!broker.options.queue) broker.options.queue = { | |
}; | |
broker.options.queue.namespace = broker.options.namespace; | |
if(!Array.isArray(broker.options.replCommands)) broker.options.replCommands = []; | |
broker.options.replCommands.push({ | |
command: "queue", | |
alias: "job", | |
description: "Add new job", | |
options: [ | |
{ | |
option: "-n, --queue <queueName>", description: "Queue name" | |
}, | |
{ | |
option: "-b, --body <body>", description: "Job body" | |
} | |
], | |
types: { | |
//number: ["service"] | |
}, | |
async action(broker, args) { | |
const { options } = args; | |
console.log(options); | |
const job = broker.queue.newJob(options.queue); | |
let payload = options.body; | |
if (typeof(payload) == "string") payload = JSON.parse(payload); | |
if(payload) job.setBody(payload); | |
const startTime = process.hrtime(); | |
const result = await job.produce(); | |
const diff = process.hrtime(startTime); | |
const duration = (diff[0] + diff[1] / 1e9) * 1000; | |
broker.logger.info(`Done ${options.queue}:${job.getId()}, duration: ${duration}ms`); | |
broker.logger.info(job.toString(), result); | |
} | |
}); | |
broker.logger.info("🦄 Queue middleware ready"); | |
broker.queue = { | |
newJob: (queue) => new Job(queue) | |
}; | |
}, | |
started(broker) { | |
this.logger.info(broker.options.queue); | |
}, | |
// Before broker stopping | |
stopping(broker) { | |
return Promise.all([ | |
gracefulShutdown(broker, producers), | |
gracefulShutdown(broker, consumers) | |
]); | |
} | |
}; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@weyoss hey! Thanks for the reply. I'll check the new version!