Skip to content

Instantly share code, notes, and snippets.

@offlinehacker
Last active May 9, 2016 00:27
Show Gist options
  • Select an option

  • Save offlinehacker/e4f9d84acc372582cbe2fa3dc7c9ed7e to your computer and use it in GitHub Desktop.

Select an option

Save offlinehacker/e4f9d84acc372582cbe2fa3dc7c9ed7e to your computer and use it in GitHub Desktop.
PubSub
'use strict';
const amqp = require('amqplib-easy')('amqp://localhost');
const exchangeOptions = {
exchange: 'amq.headers',
exchangeType: 'headers'
};
// takes a default connection for handling errors
amqp.connect().then(connection => {
connection.on('error', err => {
console.log('rabbitmq error', err);
});
}).catch(err => {
console.log('rabbitmq error', err);
});
// message headers, to select from
const headers = {
service: 'core',
type: 'gatewayConnection',
action: 'update',
user: 'a264728e-eaad-4121-808d-90d7c94a0843',
gateway: 'cdfac656-4855-4c49-abe5-836d54e4a3ba'
};
// upon update send old and new value
const data = {
value: {status: 1},
oldValue: {status: 0}
};
// publish event
amqp
.publish(exchangeOptions, '', data, {headers: headers})
.then(ok => console.log('message published'))
.catch(err => console.error('error publishing message'));
'use strict';
const amqp = require('amqplib-easy')('amqp://localhost');
// takes a default connection for handling errors
amqp.connect().then(connection => {
connection.on('error', err => {
console.log('rabbitmq error', err);
});
}).catch(err => {
console.log('rabbitmq error', err);
});
// we select based on values in headers
const selector = {
service: 'core',
type: 'gatewayConnection'
};
const queue = 'test';
// consume messages
amqp.consume({
exchange: 'amq.headers', exchangeType: 'headers',
arguments: selector, queue: queue, topics: ['']
}, msg => {
console.log('headers', msg.properties.headers);
console.log('new message', msg.json);
return Promise.resolve(); // return resolved promise for ack, and rejected promise for nack
}).then(stop => {
console.log('successfully subscribed');
}).catch(err => {
console.log('subscription error', err);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment