Last active
February 6, 2022 08:35
-
-
Save barisere/6cf7641165ac2f0b4a6e57c0e59a4e44 to your computer and use it in GitHub Desktop.
A TypeScript class that allows you read AMQP messages one-at-a-time by sequencing them through a Node.js event emitter.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { subscriber } from '@private-rabbitmq-wrapper'; | |
import { EventEmitter, once } from 'events'; | |
import { ConsumeMessage, Replies } from 'amqplib'; | |
interface Options { | |
exchange: string; | |
eventName: string; | |
queueName: string; | |
forwardedEventName: (_: ConsumeMessage) => string; | |
} | |
/** | |
* AMQPMessageSequencer listens on a queue for messages and | |
* forwards them over a Node.js EventEmitter. | |
* The purpose of this class is to facilitate tests that listen on | |
* a queue for signals to continue. If many of such tests run, one | |
* consumer can receive the messages for other tests, thus causing | |
* the other tests to stall while only one proceeds. This occurred | |
* in a `test.each` case. | |
* We could poll the queue after a timeout, but timeouts can be finicky in tests. | |
* A push API is preferable to a pull API here. | |
* | |
* @example | |
* const sequencer = new AMQPMessageSequencer({ | |
* exchange: 'test.exchange', | |
* eventName: 'event-name', | |
* queueName: 'queue-name', | |
* forwardedEventName(msg) { return msg.toString() } | |
* }); | |
* await sequencer.start(); | |
* sequencer.once('forwarded-event', () => Promise.resolve(3)); | |
* await sequencer.stop(); | |
*/ | |
export class AMQPMessageSequencer { | |
private tag?: Replies.Consume; | |
private readonly doneOutbox = new EventEmitter(); | |
constructor(private readonly options: Options) {} | |
async start() { | |
this.tag = this.tag ?? (await this.listen()); | |
} | |
private listen(): Promise<Replies.Consume> { | |
return subscriber.on( | |
this.options.exchange, | |
this.options.eventName, | |
this.options.queueName, | |
(msg) => { | |
subscriber.acknowledgeMessage(msg); | |
this.doneOutbox.emit(this.options.forwardedEventName(msg)); | |
} | |
); | |
} | |
async stop() { | |
this.tag && | |
(await subscriber | |
.getChannel() | |
.cancel(this.tag.consumerTag) | |
.then(() => (this.tag = void 0))); | |
} | |
async once<T extends unknown>( | |
forwardedEventName: string, | |
run: () => Promise<T> | |
): Promise<[any[], T]> { | |
const forwardedEventOnce = once(this.doneOutbox, forwardedEventName); | |
const result = await run(); | |
const forwardedArgs = await forwardedEventOnce; | |
return [forwardedArgs, result]; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment