Created
May 2, 2021 15:33
-
-
Save intech/cef1c814341565ca5543dcd10828857d to your computer and use it in GitHub Desktop.
Moleculer Middleware redis-smq
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { callbackify } = require("util"); | |
const { GracefulStopTimeoutError } = require("moleculer").Errors; | |
const { Message, Producer, Consumer } = require("redis-smq"); | |
const events = require("redis-smq/src/events"); | |
module.exports = function QueueMiddleware() { | |
const producers = new Map(); | |
const consumers = new Map(); | |
function gracefulShutdown(broker, items) { | |
if (!items || !items.size) return broker.Promise.resolve(); | |
return new broker.Promise(resolve => { | |
let timedOut = false; | |
const timeout = setTimeout(() => { | |
timedOut = true; | |
broker.logger.error(new GracefulStopTimeoutError({ | |
items: items.keys() | |
})); | |
resolve(); | |
}, 5e3); | |
let first = true; | |
const checkForDown = () => { | |
broker.logger.info(`🦄 Queue items down ${items.size}`); | |
if (items.size) { | |
clearTimeout(timeout); | |
resolve(); | |
} else { | |
if (first) { | |
broker.logger.warn( | |
`🦄 Queue waiting for ${items.size} running item(s)...` | |
); | |
first = false; | |
} | |
if (!timedOut) setTimeout(checkForDown, 100); | |
} | |
}; | |
setImmediate(checkForDown); | |
}); | |
} | |
function getQueueName(schema) { | |
if (schema.version != null && !(schema.settings || { | |
}).$noVersionPrefix) { | |
return formatQueueName(`v${schema.version}.${schema.name}`); | |
} | |
return formatQueueName(schema.name); | |
} | |
function formatQueueName(queue) { | |
return queue.toLowerCase().replace(/[^a-z0-9_-]/g, "-"); | |
} | |
return { | |
name: "Queue", | |
serviceCreating(service, schema) { | |
if (schema.queues) { | |
for(let [name, handler] of Object.entries(schema.queues)) { | |
let options = { | |
}; | |
if(typeof handler === "object") { | |
handler = handler.handler; | |
options = handler.options; | |
} else if(typeof handler !== "function") throw new Error(`Required handler for queue '${name}'`); | |
Consumer.queueName = `${getQueueName(schema)}-${name}`; | |
const consumer = new Consumer(service.broker.options.queue, options); | |
consumer.consume = callbackify(handler).bind(service); | |
consumer.on(events.DOWN, () => consumers.delete(name)); | |
consumers.set(Consumer.queueName, consumer); | |
consumer.run(); | |
service.broker.logger.info( | |
`🦄 Queue consumer ${name} in service ${Consumer.queueName}` | |
); | |
} | |
service.broker.logger.info( | |
`🦄 Queue consumer registers ${Object.keys(schema.queues).length}` | |
); | |
} | |
}, | |
// After the broker created | |
created(broker) { | |
class Job extends Message { | |
constructor(queue) { | |
super(); | |
this.queue = formatQueueName(queue); | |
} | |
async produce() { | |
return new broker.Promise((resolve, reject) => { | |
let producer = null; | |
if (!producers.has(this.queue)) { | |
producer = new Producer(this.queue, broker.options.queue); | |
producer.on(events.DOWN, () => producers.delete(this.queue)); | |
producers.set(this.queue, producer); | |
} else producer = producers.get(this.queue); | |
producer.produceMessage(this, err => (err ? reject(err) : resolve())); | |
}); | |
} | |
} | |
if(!broker.options.queue) broker.options.queue = { | |
}; | |
broker.options.queue.namespace = broker.options.namespace; | |
if(!Array.isArray(broker.options.replCommands)) broker.options.replCommands = []; | |
broker.options.replCommands.push({ | |
command: "queue", | |
alias: "job", | |
description: "Add new job", | |
options: [ | |
{ | |
option: "-n, --queue <queueName>", description: "Queue name" | |
}, | |
{ | |
option: "-b, --body <body>", description: "Job body" | |
} | |
], | |
types: { | |
//number: ["service"] | |
}, | |
async action(broker, args) { | |
const { options } = args; | |
console.log(options); | |
const job = broker.queue.newJob(options.queue); | |
let payload = options.body; | |
if (typeof(payload) == "string") payload = JSON.parse(payload); | |
if(payload) job.setBody(payload); | |
const startTime = process.hrtime(); | |
const result = await job.produce(); | |
const diff = process.hrtime(startTime); | |
const duration = (diff[0] + diff[1] / 1e9) * 1000; | |
broker.logger.info(`Done ${options.queue}:${job.getId()}, duration: ${duration}ms`); | |
broker.logger.info(job.toString(), result); | |
} | |
}); | |
broker.logger.info("🦄 Queue middleware ready"); | |
broker.queue = { | |
newJob: (queue) => new Job(queue) | |
}; | |
}, | |
started(broker) { | |
this.logger.info(broker.options.queue); | |
}, | |
// Before broker stopping | |
stopping(broker) { | |
return Promise.all([ | |
gracefulShutdown(broker, producers), | |
gracefulShutdown(broker, consumers) | |
]); | |
} | |
}; | |
}; |
const job = broker.queue.newJob("v1.batch.test"); // template queues: v1.service.handler or service.handler
job.setBody({ test: 123 });
// in docs more
Job instance of Message: https://github.com/weyoss/redis-smq/blob/master/docs/api/message.md
Hey @intech, I have found your gist while googling and decided to leave you a comment.
Many design and API changes have been made since the last time we were discussing RedisSMQ.
Now the MQ is more suitable for micro-services. Please take a look at https://github.com/weyoss/redis-smq for an overview.
Also the number of Redis connections used by the MQ, and the number of queries to Redis server, were optimized and significantly reduced while maintaining the performance.
So, I hope you try it again. Your feedbacks are welcome )
@weyoss hey! Thanks for the reply. I'll check the new version!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Simple integrate with REPL and magic layer for consumer&producer =)