Created
February 1, 2021 23:30
-
-
Save mallendeo/acecfacac74a9676f23d8882e1040602 to your computer and use it in GitHub Desktop.
RabbitMQ lib example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import amqplib, { Channel, ConsumeMessage, Replies } from 'amqplib' | |
import isFunction from 'lodash/isFunction' | |
import config from '../config' | |
import { logger } from './logger' | |
const { user, password, host } = config.amqplib | |
const url = user && password ? `amqp://${user}:${password}@${host}` : `amqp://${host}` | |
export const open = amqplib.connect(url) | |
export const createChannel = async (prefetch?: number): Promise<Channel> => { | |
const conn = await open | |
const ch = await conn.createChannel() | |
prefetch && (await ch.prefetch(prefetch)) | |
return ch | |
} | |
export type OnQueueError = (ch: Channel, msg: ConsumeMessage, err: Error, queueName: string) => void | |
export const createQueue = (channel?: Channel) => async ( | |
queueName: string, | |
opts?: { prefetch?: number; assert?: string[] }, | |
onMsg?: (msg: ConsumeMessage, ch: Channel) => Promise<any>, | |
onError?: OnQueueError, | |
): Promise<Replies.AssertQueue> => { | |
const { prefetch = 1, assert = [] } = opts | |
try { | |
let ch = channel | |
if (!ch) { | |
const conn = await open | |
ch = await conn.createChannel() | |
await ch.prefetch(prefetch) | |
} | |
await Promise.all([...assert].map((q) => ch.assertQueue(q))) | |
const queue = await ch.assertQueue(queueName) | |
ch.consume( | |
queueName, | |
(msg: ConsumeMessage) => | |
onMsg(msg, ch).catch((err: Error) => onError && onError(ch, msg, err, queueName)), | |
{ noAck: false }, | |
) | |
logger.info(`[${queueName}] Waiting for messages.`) | |
return queue | |
} catch (err) { | |
logger.error(err) | |
} | |
} | |
export const sendToQueue = (ch: Channel) => async ( | |
queue: string, | |
data: unknown, | |
): Promise<boolean> => ch.sendToQueue(queue, Buffer.from(JSON.stringify(data))) | |
export const getQueuesInfo = async ( | |
ch: Channel, | |
allQueues: Replies.AssertQueue[], | |
): Promise<{ done: boolean; queues: Replies.AssertQueue[] }> => { | |
const queues = await Promise.all( | |
allQueues.map((queue: Replies.AssertQueue) => ch.checkQueue(queue.queue)), | |
) | |
return { | |
done: queues.every((queue: Replies.AssertQueue) => queue.messageCount === 0), | |
queues, | |
} | |
} | |
interface OptionalParams { | |
interval?: number | |
waitOnEnd?: number | |
onStatus?: (status: unknown) => unknown | |
onEnd?: (status: unknown) => unknown | |
} | |
export const waitForQueuesToEnd = ( | |
ch: Channel, | |
allQueues: Replies.AssertQueue[], | |
{ interval = 1000, waitOnEnd = 5000, onStatus, onEnd }: OptionalParams = {}, | |
): Promise<{ done: boolean; queues: Replies.AssertQueue[] }> => | |
new Promise((resolve) => { | |
const check = async (done?: boolean) => { | |
const status = await getQueuesInfo(ch, allQueues) | |
isFunction(onStatus) && onStatus(status) | |
if (done) { | |
isFunction(onEnd) && onEnd(status) | |
return resolve(status) | |
} | |
if (!status.done) { | |
return setTimeout(check, interval) | |
} | |
setTimeout(() => check(true), waitOnEnd) | |
} | |
check() | |
}) | |
export default amqplib |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment