Skip to content

Instantly share code, notes, and snippets.

@lenconda
Created March 20, 2019 14:46
Show Gist options
  • Save lenconda/7ce185fb98f8594aba75e2a6223d845c to your computer and use it in GitHub Desktop.
Save lenconda/7ce185fb98f8594aba75e2a6223d845c to your computer and use it in GitHub Desktop.
A producer-consumer model for RabbitMQ and test scripts
import config from '../../config/rabbitmq'
import { Consumer } from '../message_queue'
const consumer = new Consumer(config)
consumer.on('message', message => {
console.log(message)
})
import amqp, { AMQPClient, QueueCallback } from 'amqp'
import getLogger from './logger'
import events from 'events'
import { IRabbitMQConfig } from '../interfaces/config'
const emitter = new events.EventEmitter()
const logger = getLogger(__filename)
class MessageQueueBase extends events.EventEmitter {
constructor (config: IRabbitMQConfig) {
super()
this.connection = amqp.createConnection({
host: config.MQ_HOST,
port: config.MQ_PORT,
login: config.MQ_LOGIN,
password: config.MQ_PASSWORD,
connectionTimeout: config.MQ_TIMEOUT })
}
protected connection: AMQPClient
public destroy = () => this.connection.disconnect()
}
export class Producer extends MessageQueueBase {
constructor (config: IRabbitMQConfig) {
super(config)
this.connection.on('ready', () => {
emitter.on('publish', (message: string) => {
try {
this.connection.publish(
config.MQ_QUEUE, message, { contentEncoding: 'utf-8' }, null)
logger.info(`published ${message} to message queue`)
} catch (e) {
logger.info(e.toString())}
})
})
}
public publish = (message: string) => emitter.emit('publish', message)
}
interface IQueueCallback extends QueueCallback {
subscribe: (callback: (message: any) => any) => any
}
export class Consumer extends MessageQueueBase {
constructor (config: IRabbitMQConfig) {
super(config)
this.connection.on('ready', () => {
this.connection.queue(config.MQ_QUEUE, (queue: IQueueCallback) => {
queue.subscribe(message => {
this.emit('message', unescape(message.data))
})
})
})
}
}
import config from '../../config/rabbitmq'
import { Producer } from '../message_queue'
import uuidv4 from 'uuid/v4'
async function delay (ms) {
return new Promise(resolve => {
setTimeout(() => resolve(), ms)
})
}
async function test () {
const producer = new Producer(config)
let items = Array(100).fill(null).map((value, index) => uuidv4())
for (let item of items) {
try {
await delay(1000)
producer.publish(item)
} catch (e) {
console.log(e.toString())
continue
}
}
}
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment