Skip to content

Instantly share code, notes, and snippets.

@barisere
Last active February 6, 2022 08:35
Show Gist options
  • Save barisere/6cf7641165ac2f0b4a6e57c0e59a4e44 to your computer and use it in GitHub Desktop.
Save barisere/6cf7641165ac2f0b4a6e57c0e59a4e44 to your computer and use it in GitHub Desktop.
A TypeScript class that allows you read AMQP messages one-at-a-time by sequencing them through a Node.js event emitter.
import { subscriber } from '@private-rabbitmq-wrapper';
import { EventEmitter, once } from 'events';
import { ConsumeMessage, Replies } from 'amqplib';
interface Options {
exchange: string;
eventName: string;
queueName: string;
forwardedEventName: (_: ConsumeMessage) => string;
}
/**
* AMQPMessageSequencer listens on a queue for messages and
* forwards them over a Node.js EventEmitter.
* The purpose of this class is to facilitate tests that listen on
* a queue for signals to continue. If many of such tests run, one
* consumer can receive the messages for other tests, thus causing
* the other tests to stall while only one proceeds. This occurred
* in a `test.each` case.
* We could poll the queue after a timeout, but timeouts can be finicky in tests.
* A push API is preferable to a pull API here.
*
* @example
* const sequencer = new AMQPMessageSequencer({
* exchange: 'test.exchange',
* eventName: 'event-name',
* queueName: 'queue-name',
* forwardedEventName(msg) { return msg.toString() }
* });
* await sequencer.start();
* sequencer.once('forwarded-event', () => Promise.resolve(3));
* await sequencer.stop();
*/
export class AMQPMessageSequencer {
private tag?: Replies.Consume;
private readonly doneOutbox = new EventEmitter();
constructor(private readonly options: Options) {}
async start() {
this.tag = this.tag ?? (await this.listen());
}
private listen(): Promise<Replies.Consume> {
return subscriber.on(
this.options.exchange,
this.options.eventName,
this.options.queueName,
(msg) => {
subscriber.acknowledgeMessage(msg);
this.doneOutbox.emit(this.options.forwardedEventName(msg));
}
);
}
async stop() {
this.tag &&
(await subscriber
.getChannel()
.cancel(this.tag.consumerTag)
.then(() => (this.tag = void 0)));
}
async once<T extends unknown>(
forwardedEventName: string,
run: () => Promise<T>
): Promise<[any[], T]> {
const forwardedEventOnce = once(this.doneOutbox, forwardedEventName);
const result = await run();
const forwardedArgs = await forwardedEventOnce;
return [forwardedArgs, result];
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment