Skip to content

Instantly share code, notes, and snippets.

@lenconda
Last active February 8, 2019 15:51
Show Gist options
  • Save lenconda/a0204b2ab4a4e03df09fc6871e11f7d2 to your computer and use it in GitHub Desktop.
Save lenconda/a0204b2ab4a4e03df09fc6871e11f7d2 to your computer and use it in GitHub Desktop.
Another producer consumer model
// /src/publisher.js
const amqp = require('amqp')
const {
MQ_HOST,
MQ_PORT,
MQ_LOGIN,
MQ_PASSWORD,
MQ_TIMEOUT,
MQ_QUEUE } = require('../../config')
const getLogger = require('../utils/logger')
const _connection = Symbol('connection')
const logger = getLogger(__filename)
class CheckerPublisher {
/**
* @constructor
* @param {Array<any>} queue
*/
constructor () {
this[_connection] = amqp.createConnection({
host: MQ_HOST,
port: MQ_PORT,
login: MQ_LOGIN,
password: MQ_PASSWORD,
connectionTimeout: MQ_TIMEOUT,
}, {reconnect: false})
}
/**
*
* start the publisher
*
* @param {Array<any>} queue
* @private
*/
start (queue) {
this[_connection]
.on('ready', () => {
let cursor = 0
let interval = setInterval(() => {
this[_connection].publish(MQ_QUEUE, queue[cursor])
logger.info(`published ${queue[cursor]} to ${MQ_QUEUE} at ${MQ_HOST}:${MQ_PORT}`)
cursor += 1
if (queue.length - 1 < cursor) clearInterval(interval)
}, 1000)
})
}
/**
*
* stop the publisher
*
* @private
*/
stop () {
this[_connection].disconnect()
}
}
module.exports = CheckerPublisher
// /services/publisher.service.js
const Publisher = require('../src/checker/publisher')
const publisher = new Publisher()
publisher.start(Array(500).fill(null).map((value, index) => `this is the ${index} task`))
// /src/worker.js
const amqp = require('amqp')
const {
MQ_HOST,
MQ_PORT,
MQ_LOGIN,
MQ_PASSWORD,
MQ_TIMEOUT,
MQ_QUEUE } = require('../../config')
const getLogger = require('../utils/logger')
const request = require('superagent')
require('superagent-proxy')(request)
const { Clock } = require('../utils/timer')
const _connection = Symbol('connection')
const _consume = Symbol('consume')
const _checker = Symbol('checker')
const _checker_http = Symbol('checkerHttp')
const _checker_https = Symbol('checkerHttps')
const _clock = Symbol('clock')
const logger = getLogger(__filename)
class Worker {
/**
* @constructor
*/
constructor (pid) {
this[_connection] = amqp.createConnection({
host: MQ_HOST,
port: MQ_PORT,
login: MQ_LOGIN,
password: MQ_PASSWORD,
connectionTimeout: MQ_TIMEOUT,
})
this[_clock] = new Clock()
logger.info(`start worker at ${Date.parse(new Date())} with pid ${pid}`)
this.pid = pid
}
/**
*
* fetch task from RabbitMQ
*
* @private
*/
[_consume] () {
this[_connection]
.queue(MQ_QUEUE, queue => {
queue.subscribe(message => {
logger.info(`[pid: ${this.pid}] consume '${unescape(message.data)}' from ${MQ_QUEUE} at ${MQ_HOST}:${MQ_PORT}`)
})
})
}
/**
*
* start the consumer
*
* @private
*/
start () {
this[_connection].on('ready', () => {
this[_consume]()
})
}
/**
*
* stop the consumer
*
* @private
*/
stop () {
this[_connection].disconnect()
}
}
module.exports = Worker
// /services/worker.service.js
const { exec } = require('child_process')
for (let i = 0; i < 5; i++) {
exec('npm run worker')
}
// /scripts/worker.js
const Worker = require('../checker/consumer')
const worker = new Worker(process.pid)
worker.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment