Skip to content

Instantly share code, notes, and snippets.

@shyiko
Last active July 26, 2017 02:57
Show Gist options
  • Save shyiko/128a5ff9926f2f710ee2530cf3f3c1dc to your computer and use it in GitHub Desktop.
Save shyiko/128a5ff9926f2f710ee2530cf3f3c1dc to your computer and use it in GitHub Desktop.
semaphore over amqp (POC)
const amqp = require('amqplib/callback_api')
const assert = require('assert')
amqp.connect((err, con) => {
assert.ifError(err)
console.log('connected')
con.createChannel((err, ch) => {
assert.ifError(err)
ch.prefetch(1, false)
ch.assertQueue('Q', {durable: false, autoDelete: true, maxLength: 1}, (err) => {
assert.ifError(err)
ch.sendToQueue('Q', Buffer.from(''))
ch.consume('Q', (msg) => {
console.log('received from Q:' + JSON.stringify(msg))
// using a separate channel to guard against RESOURCE-LOCKED (code 405)
// (which renders channel in-operable and is thrown when exclusive access to Q' cannot be acquired)
con.createChannel((err, che) => {
assert.ifError(err)
che.on('error', (err) => {
if (err.code === 405) {
console.log('ACK')
ch.ack(msg)
} else {
throw err
}
})
che.assertQueue('Q\'', {durable: false, autoDelete: true, exclusive: true},
(err) => !err && eternalWorker())
})
}, {noAck: false})
})
})
})
function eternalWorker() {
console.log('doing stuff')
}
@shyiko
Copy link
Author

shyiko commented Jul 26, 2017

npm i amqplib # tested on 0.5.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment