Created
March 2, 2015 13:34
-
-
Save bennadel/7a4ee62a92fc00a24603 to your computer and use it in GitHub Desktop.
Shedding The Monolithic Application With AWS Simple Queue Service (SQS) And Node.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Require the demo configuration. This contains settings for this demo, including | |
| // the AWS credentials and target queue settings. | |
| var config = require( "./config.json" ); | |
| // Require libraries. | |
| var aws = require( "aws-sdk" ); | |
| var Q = require( "q" ); | |
| var chalk = require( "chalk" ); | |
| // Create an instance of our SQS Client. | |
| var sqs = new aws.SQS({ | |
| region: config.aws.region, | |
| accessKeyId: config.aws.accessID, | |
| secretAccessKey: config.aws.secretKey, | |
| // For every request in this demo, I'm going to be using the same QueueUrl; so, | |
| // rather than explicitly defining it on every request, I can set it here as the | |
| // default QueueUrl to be automatically appended to every request. | |
| params: { | |
| QueueUrl: config.aws.queueUrl | |
| } | |
| }); | |
| // Proxy the appropriate SQS methods to ensure that they "unwrap" the common node.js | |
| // error / callback pattern and return Promises. Promises are good and make it easier to | |
| // handle sequential asynchronous data. | |
| var receiveMessage = Q.nbind( sqs.receiveMessage, sqs ); | |
| var deleteMessage = Q.nbind( sqs.deleteMessage, sqs ); | |
| // ---------------------------------------------------------- // | |
| // ---------------------------------------------------------- // | |
| // When pulling messages from Amazon SQS, we can open up a long-poll which will hold open | |
| // until a message is available, for up to 20-seconds. If no message is returned in that | |
| // time period, the request will end "successfully", but without any Messages. At that | |
| // time, we'll want to re-open the long-poll request to listen for more messages. To | |
| // kick off this cycle, we can create a self-executing function that starts to invoke | |
| // itself, recursively. | |
| (function pollQueueForMessages() { | |
| console.log( chalk.yellow( "Starting long-poll operation." ) ); | |
| // Pull a message - we're going to keep the long-polling timeout short so as to | |
| // keep the demo a little bit more interesting. | |
| receiveMessage({ | |
| WaitTimeSeconds: 3, // Enable long-polling (3-seconds). | |
| VisibilityTimeout: 10 | |
| }) | |
| .then( | |
| function handleMessageResolve( data ) { | |
| // If there are no message, throw an error so that we can bypass the | |
| // subsequent resolution handler that is expecting to have a message | |
| // delete confirmation. | |
| if ( ! data.Messages ) { | |
| throw( | |
| workflowError( | |
| "EmptyQueue", | |
| new Error( "There are no messages to process." ) | |
| ) | |
| ); | |
| } | |
| // --- | |
| // TODO: Actually process the message in some way :P | |
| // --- | |
| console.log( chalk.green( "Deleting:", data.Messages[ 0 ].MessageId ) ); | |
| // Now that we've processed the message, we need to tell SQS to delete the | |
| // message. Right now, the message is still in the queue, but it is marked | |
| // as "invisible". If we don't tell SQS to delete the message, SQS will | |
| // "re-queue" the message when the "VisibilityTimeout" expires such that it | |
| // can be handled by another receiver. | |
| return( | |
| deleteMessage({ | |
| ReceiptHandle: data.Messages[ 0 ].ReceiptHandle | |
| }) | |
| ); | |
| } | |
| ) | |
| .then( | |
| function handleDeleteResolve( data ) { | |
| console.log( chalk.green( "Message Deleted!" ) ); | |
| } | |
| ) | |
| // Catch any error (or rejection) that took place during processing. | |
| .catch( | |
| function handleError( error ) { | |
| // The error could have occurred for both known (ex, business logic) and | |
| // unknown reasons (ex, HTTP error, AWS error). As such, we can treat these | |
| // errors differently based on their type (since I'm setting a custom type | |
| // for my business logic errors). | |
| switch ( error.type ) { | |
| case "EmptyQueue": | |
| console.log( chalk.cyan( "Expected Error:", error.message ) ); | |
| break; | |
| default: | |
| console.log( chalk.red( "Unexpected Error:", error.message ) ); | |
| break; | |
| } | |
| } | |
| ) | |
| // When the promise chain completes, either in success of in error, let's kick the | |
| // long-poll operation back up and look for moar messages. | |
| .finally( pollQueueForMessages ); | |
| })(); | |
| // When processing the SQS message, we will use errors to help control the flow of the | |
| // resolution and rejection. We can then use the error "type" to determine how to | |
| // process the error object. | |
| function workflowError( type, error ) { | |
| error.type = type; | |
| return( error ); | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Require the demo configuration. This contains settings for this demo, including | |
| // the AWS credentials and target queue settings. | |
| var config = require( "./config.json" ); | |
| // Require libraries. | |
| var aws = require( "aws-sdk" ); | |
| var Q = require( "q" ); | |
| var chalk = require( "chalk" ); | |
| // Create an instance of our SQS Client. | |
| var sqs = new aws.SQS({ | |
| region: config.aws.region, | |
| accessKeyId: config.aws.accessID, | |
| secretAccessKey: config.aws.secretKey, | |
| // For every request in this demo, I'm going to be using the same QueueUrl; so, | |
| // rather than explicitly defining it on every request, I can set it here as the | |
| // default QueueUrl to be automatically appended to every request. | |
| params: { | |
| QueueUrl: config.aws.queueUrl | |
| } | |
| }); | |
| // Proxy the appropriate SQS methods to ensure that they "unwrap" the common node.js | |
| // error / callback pattern and return Promises. Promises are good and make it easier to | |
| // handle sequential asynchronous data. | |
| var sendMessage = Q.nbind( sqs.sendMessage, sqs ); | |
| // ---------------------------------------------------------- // | |
| // ---------------------------------------------------------- // | |
| // Now that we have a Q-ified method, we can send the message. | |
| sendMessage({ | |
| MessageBody: "This is my first ever SQS request... evar!" | |
| }) | |
| .then( | |
| function handleSendResolve( data ) { | |
| console.log( chalk.green( "Message sent:", data.MessageId ) ); | |
| } | |
| ) | |
| // Catch any error (or rejection) that took place during processing. | |
| .catch( | |
| function handleReject( error ) { | |
| console.log( chalk.red( "Unexpected Error:", error.message ) ); | |
| } | |
| ); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment