Last active
October 26, 2019 20:28
-
-
Save lehno/947dafda091e4704acd6b262c99fceeb to your computer and use it in GitHub Desktop.
SQS Mixin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
// Use this produce and consume jobs from AWS SQS as a normal queue list. | |
// Just add as a mixin and listen to your queues | |
const { Consumer } = require('sqs-consumer'); | |
const Producer = require('sqs-producer'); | |
const _ = require('lodash'); | |
const AWS = require('aws-sdk'); | |
module.exports = function (url) { | |
return { | |
settings: { | |
url: url || 'https://sqs.eu-west-1.amazonaws.com/<USERID>' | |
}, | |
methods: { | |
getSQSQueueConsumer (name, fn) { | |
if (!this.$consumers[name]) { | |
try { | |
let consumer = Consumer.create({ | |
queueUrl: `${this.settings.url}/${name}`, | |
sqs: new AWS.SQS(), | |
handleMessage: fn.bind(this) | |
}); | |
consumer.on('error', (err) => { | |
this.logger.error(err); | |
}); | |
consumer.on('processing_error', (err) => { | |
this.logger.error(err); | |
}); | |
consumer.on('timeout_error', (err) => { | |
this.logger.error(err); | |
}); | |
this.$consumers[name] = consumer; | |
} catch (err) { | |
this.logger.error(err); | |
return this.Promise.reject('Unable to start queue'); | |
} | |
} | |
return this.$consumers[name]; | |
}, | |
getSQSQueueProducer (name) { | |
if (!this.$producers[name]) { | |
this.$producers[name] = Producer.create({ | |
queueUrl: `${this.settings.url}/${name}`, | |
sqs: new AWS.SQS() | |
}); | |
} | |
return this.$producers[name]; | |
}, | |
async addSQSJob (name, message) { | |
let queue = this.getSQSQueueProducer(name); | |
queue.send(JSON.stringify(message), function (err) { | |
if (err) this.logger.error(err); | |
}); | |
}, | |
}, | |
created () { | |
AWS.config.loadFromPath('./config/env/aws-development.json'); | |
this.$consumers = {}; | |
this.$producers = {}; | |
}, | |
started () { | |
if (!this.settings.url) { | |
return this.Promise.reject('Missing options URL'); | |
} | |
try { | |
if (this.schema.SQSQueues) { | |
_.forIn(this.schema.SQSQueues, (fn, name) => { | |
let channel = this.getSQSQueueConsumer(name, fn); | |
channel.start(); | |
}); | |
} | |
return this.Promise.resolve(); | |
} catch (err) { | |
this.logger.error(err); | |
return this.Promise.reject('Unable to connect to AMQP'); | |
} | |
} | |
}; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment