Skip to content

Instantly share code, notes, and snippets.

@mallendeo
Created February 1, 2021 23:30
Show Gist options
  • Save mallendeo/acecfacac74a9676f23d8882e1040602 to your computer and use it in GitHub Desktop.
Save mallendeo/acecfacac74a9676f23d8882e1040602 to your computer and use it in GitHub Desktop.
RabbitMQ lib example
import amqplib, { Channel, ConsumeMessage, Replies } from 'amqplib'
import isFunction from 'lodash/isFunction'
import config from '../config'
import { logger } from './logger'
const { user, password, host } = config.amqplib
const url = user && password ? `amqp://${user}:${password}@${host}` : `amqp://${host}`
export const open = amqplib.connect(url)
export const createChannel = async (prefetch?: number): Promise<Channel> => {
const conn = await open
const ch = await conn.createChannel()
prefetch && (await ch.prefetch(prefetch))
return ch
}
export type OnQueueError = (ch: Channel, msg: ConsumeMessage, err: Error, queueName: string) => void
export const createQueue = (channel?: Channel) => async (
queueName: string,
opts?: { prefetch?: number; assert?: string[] },
onMsg?: (msg: ConsumeMessage, ch: Channel) => Promise<any>,
onError?: OnQueueError,
): Promise<Replies.AssertQueue> => {
const { prefetch = 1, assert = [] } = opts
try {
let ch = channel
if (!ch) {
const conn = await open
ch = await conn.createChannel()
await ch.prefetch(prefetch)
}
await Promise.all([...assert].map((q) => ch.assertQueue(q)))
const queue = await ch.assertQueue(queueName)
ch.consume(
queueName,
(msg: ConsumeMessage) =>
onMsg(msg, ch).catch((err: Error) => onError && onError(ch, msg, err, queueName)),
{ noAck: false },
)
logger.info(`[${queueName}] Waiting for messages.`)
return queue
} catch (err) {
logger.error(err)
}
}
export const sendToQueue = (ch: Channel) => async (
queue: string,
data: unknown,
): Promise<boolean> => ch.sendToQueue(queue, Buffer.from(JSON.stringify(data)))
export const getQueuesInfo = async (
ch: Channel,
allQueues: Replies.AssertQueue[],
): Promise<{ done: boolean; queues: Replies.AssertQueue[] }> => {
const queues = await Promise.all(
allQueues.map((queue: Replies.AssertQueue) => ch.checkQueue(queue.queue)),
)
return {
done: queues.every((queue: Replies.AssertQueue) => queue.messageCount === 0),
queues,
}
}
interface OptionalParams {
interval?: number
waitOnEnd?: number
onStatus?: (status: unknown) => unknown
onEnd?: (status: unknown) => unknown
}
export const waitForQueuesToEnd = (
ch: Channel,
allQueues: Replies.AssertQueue[],
{ interval = 1000, waitOnEnd = 5000, onStatus, onEnd }: OptionalParams = {},
): Promise<{ done: boolean; queues: Replies.AssertQueue[] }> =>
new Promise((resolve) => {
const check = async (done?: boolean) => {
const status = await getQueuesInfo(ch, allQueues)
isFunction(onStatus) && onStatus(status)
if (done) {
isFunction(onEnd) && onEnd(status)
return resolve(status)
}
if (!status.done) {
return setTimeout(check, interval)
}
setTimeout(() => check(true), waitOnEnd)
}
check()
})
export default amqplib
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment