Last active
February 8, 2019 15:51
-
-
Save lenconda/a0204b2ab4a4e03df09fc6871e11f7d2 to your computer and use it in GitHub Desktop.
Another producer consumer model
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// /src/publisher.js | |
const amqp = require('amqp') | |
const { | |
MQ_HOST, | |
MQ_PORT, | |
MQ_LOGIN, | |
MQ_PASSWORD, | |
MQ_TIMEOUT, | |
MQ_QUEUE } = require('../../config') | |
const getLogger = require('../utils/logger') | |
const _connection = Symbol('connection') | |
const logger = getLogger(__filename) | |
class CheckerPublisher { | |
/** | |
* @constructor | |
* @param {Array<any>} queue | |
*/ | |
constructor () { | |
this[_connection] = amqp.createConnection({ | |
host: MQ_HOST, | |
port: MQ_PORT, | |
login: MQ_LOGIN, | |
password: MQ_PASSWORD, | |
connectionTimeout: MQ_TIMEOUT, | |
}, {reconnect: false}) | |
} | |
/** | |
* | |
* start the publisher | |
* | |
* @param {Array<any>} queue | |
* @private | |
*/ | |
start (queue) { | |
this[_connection] | |
.on('ready', () => { | |
let cursor = 0 | |
let interval = setInterval(() => { | |
this[_connection].publish(MQ_QUEUE, queue[cursor]) | |
logger.info(`published ${queue[cursor]} to ${MQ_QUEUE} at ${MQ_HOST}:${MQ_PORT}`) | |
cursor += 1 | |
if (queue.length - 1 < cursor) clearInterval(interval) | |
}, 1000) | |
}) | |
} | |
/** | |
* | |
* stop the publisher | |
* | |
* @private | |
*/ | |
stop () { | |
this[_connection].disconnect() | |
} | |
} | |
module.exports = CheckerPublisher |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// /services/publisher.service.js | |
const Publisher = require('../src/checker/publisher') | |
const publisher = new Publisher() | |
publisher.start(Array(500).fill(null).map((value, index) => `this is the ${index} task`)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// /src/worker.js | |
const amqp = require('amqp') | |
const { | |
MQ_HOST, | |
MQ_PORT, | |
MQ_LOGIN, | |
MQ_PASSWORD, | |
MQ_TIMEOUT, | |
MQ_QUEUE } = require('../../config') | |
const getLogger = require('../utils/logger') | |
const request = require('superagent') | |
require('superagent-proxy')(request) | |
const { Clock } = require('../utils/timer') | |
const _connection = Symbol('connection') | |
const _consume = Symbol('consume') | |
const _checker = Symbol('checker') | |
const _checker_http = Symbol('checkerHttp') | |
const _checker_https = Symbol('checkerHttps') | |
const _clock = Symbol('clock') | |
const logger = getLogger(__filename) | |
class Worker { | |
/** | |
* @constructor | |
*/ | |
constructor (pid) { | |
this[_connection] = amqp.createConnection({ | |
host: MQ_HOST, | |
port: MQ_PORT, | |
login: MQ_LOGIN, | |
password: MQ_PASSWORD, | |
connectionTimeout: MQ_TIMEOUT, | |
}) | |
this[_clock] = new Clock() | |
logger.info(`start worker at ${Date.parse(new Date())} with pid ${pid}`) | |
this.pid = pid | |
} | |
/** | |
* | |
* fetch task from RabbitMQ | |
* | |
* @private | |
*/ | |
[_consume] () { | |
this[_connection] | |
.queue(MQ_QUEUE, queue => { | |
queue.subscribe(message => { | |
logger.info(`[pid: ${this.pid}] consume '${unescape(message.data)}' from ${MQ_QUEUE} at ${MQ_HOST}:${MQ_PORT}`) | |
}) | |
}) | |
} | |
/** | |
* | |
* start the consumer | |
* | |
* @private | |
*/ | |
start () { | |
this[_connection].on('ready', () => { | |
this[_consume]() | |
}) | |
} | |
/** | |
* | |
* stop the consumer | |
* | |
* @private | |
*/ | |
stop () { | |
this[_connection].disconnect() | |
} | |
} | |
module.exports = Worker |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// /services/worker.service.js | |
const { exec } = require('child_process') | |
for (let i = 0; i < 5; i++) { | |
exec('npm run worker') | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// /scripts/worker.js | |
const Worker = require('../checker/consumer') | |
const worker = new Worker(process.pid) | |
worker.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment