Last active
December 3, 2018 19:42
-
-
Save kdgregory/84fe6fc26ee6424ecc12926565fee304 to your computer and use it in GitHub Desktop.
Examples for AWS messaging talk
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SQS examples | |
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.sqs.Producer example 3 | |
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.sqs.Consumer example 1000 | |
# SNS examples | |
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.sns.SubscribeEmail example [email protected] | |
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.sns.SubscribeSQS example example | |
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.sns.Producer example "Sample message" "This is an example" | |
# Kinesis examples | |
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.kinesis.Producer example 3 | |
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.kinesis.Consumer example 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) Keith D Gregory, all rights reserved | |
package com.example.aws.kinesis; | |
import java.util.ArrayList; | |
import java.util.Iterator; | |
import java.util.List; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.amazonaws.services.kinesis.AmazonKinesis; | |
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; | |
import com.amazonaws.services.kinesis.model.*; | |
import com.amazonaws.util.BinaryUtils; | |
/** | |
* Continuously reads a named stream at a specified rate, starting at either a specified | |
* sequence number or the the end. | |
* <p> | |
* Invocation: | |
* <pre> | |
* java -cp UBERJAR com.example.aws.kinesis.KinesisConsumer STREAM_NAME RATE [SEQNUMS] | |
* </pre> | |
* <ul> | |
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code> | |
* <li> <code>STREAM_NAME</code> is the name of the queue (must already exist) | |
* <li> <code>RATE</code> is the number of read attempts per second | |
* <li> <code>SEQNUMS</code> is a space-separated list of sequence numbers, each | |
* corresponding to a shard; if there's no sequence number for a shard we | |
* start reading from the end. | |
* </ul> | |
*/ | |
public class KinesisConsumer | |
{ | |
private final static Logger logger = LoggerFactory.getLogger(KinesisConsumer.class); | |
public static void main(String[] argv) | |
throws Exception | |
{ | |
String streamName = argv[0]; | |
int rate = Integer.parseInt(argv[1]); | |
ArrayList<String> seqnums = new ArrayList<>(); | |
for (int ii = 2 ; ii < argv.length ; ii++) | |
seqnums.add(argv[ii]); | |
AmazonKinesis client = AmazonKinesisClientBuilder.defaultClient(); | |
String[] shardIterators = retrieveInitialShardIterators(client, streamName, seqnums.iterator()); | |
while (true) | |
{ | |
readStream(client, shardIterators); | |
Thread.sleep(1000 / rate); | |
} | |
} | |
private static String[] retrieveInitialShardIterators(AmazonKinesis client, String streamName, Iterator<String> seqnums) | |
{ | |
logger.info("retrieving shard iterators"); | |
DescribeStreamResult describeResult = client.describeStream(streamName); | |
List<Shard> shards = describeResult.getStreamDescription().getShards(); | |
List<String> shardIterators = new ArrayList<>(); | |
for (Shard shard : shards) | |
{ | |
GetShardIteratorRequest shardItxRequest = new GetShardIteratorRequest() | |
.withStreamName(streamName) | |
.withShardId(shard.getShardId()); | |
if (seqnums.hasNext()) | |
{ | |
shardItxRequest.setShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER); | |
shardItxRequest.setStartingSequenceNumber(seqnums.next()); | |
} | |
else | |
{ | |
shardItxRequest.setShardIteratorType(ShardIteratorType.LATEST); | |
} | |
GetShardIteratorResult shardItxResult = client.getShardIterator(shardItxRequest); | |
shardIterators.add(shardItxResult.getShardIterator()); | |
} | |
return shardIterators.toArray(new String[shardIterators.size()]); | |
} | |
private static void readStream(AmazonKinesis client, String[] shardIterators) | |
{ | |
for (int ii = 0 ; ii < shardIterators.length ; ii++) | |
{ | |
logger.info("reading shard {}", ii); | |
GetRecordsRequest request = new GetRecordsRequest() | |
.withShardIterator(shardIterators[ii]); | |
try | |
{ | |
GetRecordsResult result = client.getRecords(request); | |
logger.info("retrieved {} records from shard {}; millis behind = {}", result.getRecords().size(), ii, result.getMillisBehindLatest()); | |
processRecords(result.getRecords()); | |
shardIterators[ii] = result.getNextShardIterator(); | |
} | |
catch (Exception ex) | |
{ | |
logger.warn("exception reading shard: {}", ex.getMessage()); | |
} | |
} | |
} | |
private static void processRecords(List<Record> records) | |
throws Exception | |
{ | |
for (Record record : records) | |
{ | |
byte[] recordData = BinaryUtils.copyAllBytesFrom(record.getData()); | |
String recordStr = new String(recordData, "UTF-8"); | |
logger.info("sequence number: {}, partion key: {}, data: {}", | |
record.getSequenceNumber(), record.getPartitionKey(), recordStr); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) Keith D Gregory, all rights reserved | |
package com.example.aws.kinesis; | |
import java.nio.ByteBuffer; | |
import java.util.ArrayList; | |
import java.util.Iterator; | |
import java.util.List; | |
import org.joda.time.Instant; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.amazonaws.services.kinesis.AmazonKinesis; | |
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; | |
import com.amazonaws.services.kinesis.model.*; | |
/** | |
* Continuously writes batches of JSON messages to a stream using a random partition key. | |
* <p> | |
* Invocation: | |
* <pre> | |
* java -cp UBERJAR com.example.aws.kinesis.KinesisProducer STREAM_NAME MAX_BATCH_SIZE | |
* </pre> | |
* <ul> | |
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code> | |
* <li> <code>STREAM_NAME</code> is the name of the queue (must already exist) | |
* <li> <code>MAX_BATCH_SIZE</code> is the maximum number of messages to send per batch | |
* </ul> | |
*/ | |
public class KinesisProducer | |
{ | |
private final static Logger logger = LoggerFactory.getLogger(KinesisProducer.class); | |
public static void main(String[] argv) | |
throws Exception | |
{ | |
String streamName = argv[0]; | |
int messagesPerBatch = Integer.parseInt(argv[1]); | |
AmazonKinesis client = AmazonKinesisClientBuilder.defaultClient(); | |
for (int batchNum = 0 ; batchNum < Integer.MAX_VALUE ; batchNum++) | |
{ | |
List<PutRecordsRequestEntry> batch = new ArrayList<>(); | |
for (int ii = 0 ; ii < messagesPerBatch ; ii++) | |
{ | |
batch.add(createRecord(batchNum, ii)); | |
// normally you would check the total batch size here | |
} | |
sendBatch(client, streamName, batchNum, batch); | |
Thread.sleep(1000); | |
} | |
} | |
private static PutRecordsRequestEntry createRecord(int batchNum, int messageNum) | |
throws Exception | |
{ | |
String message = "batch " + batchNum + " message " + messageNum + " at " + Instant.now(); | |
ByteBuffer messageBuf = ByteBuffer.wrap(message.getBytes("UTF-8")); | |
return new PutRecordsRequestEntry() | |
.withData(messageBuf) | |
.withPartitionKey(String.valueOf(batchNum)); | |
} | |
private static void sendBatch(AmazonKinesis client, String streamName, int batchNum, List<PutRecordsRequestEntry> batch) | |
throws Exception | |
{ | |
logger.info("sending batch {}, containing {} records", batchNum, batch.size()); | |
PutRecordsRequest request = new PutRecordsRequest() | |
.withStreamName(streamName) | |
.withRecords(batch); | |
PutRecordsResult result = client.putRecords(request); | |
List<PutRecordsRequestEntry> retries = new ArrayList<PutRecordsRequestEntry>(); | |
int internalErrorCount = 0; | |
Iterator<PutRecordsRequestEntry> requestEntryItx = batch.iterator(); | |
Iterator<PutRecordsResultEntry> resultEntryItx = result.getRecords().iterator(); | |
while (resultEntryItx.hasNext()) | |
{ | |
PutRecordsResultEntry resultEntry = resultEntryItx.next(); | |
PutRecordsRequestEntry requestEntry = requestEntryItx.next(); | |
if ("ProvisionedThroughputExceededException".equals(resultEntry.getErrorCode())) | |
retries.add(requestEntry); | |
else if ("InternalFailure".equals(resultEntry.getErrorCode())) | |
internalErrorCount++; | |
} | |
if (internalErrorCount > 0) | |
logger.warn("discarded {} records from batch {} due to internal errors", internalErrorCount, batchNum); | |
if (retries.size() > 0) | |
{ | |
logger.info("retrying {} records from batch {}", retries.size(), batchNum); | |
Thread.sleep(1000); | |
sendBatch(client, streamName, batchNum, retries); | |
} | |
else | |
{ | |
logger.info("finished sending batch {}", batchNum); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) Keith D Gregory, all rights reserved | |
package com.example.aws.sns; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.amazonaws.services.sns.AmazonSNS; | |
import com.amazonaws.services.sns.AmazonSNSClientBuilder; | |
import com.amazonaws.services.sns.model.*; | |
/** | |
* Sends a single message to a named SNS topic. | |
* <p> | |
* Invocation: | |
* <pre> | |
* java -cp UBERJAR com.example.aws.sns.SNSProducer TOPIC_NAME SUBJECT MESSAGE | |
* </pre> | |
* <ul> | |
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code> | |
* <li> <code>TOPIC_NAME</code> is the name of the SNS topic (must already exist) | |
* <li> <code>SUBJECT</code> is a subject for the message | |
* <li> <code>MESSAGE</code> is the message to send | |
* </ul> | |
*/ | |
public class SNSProducer | |
{ | |
private final static Logger logger = LoggerFactory.getLogger(SNSProducer.class); | |
public static void main(String[] argv) | |
throws Exception | |
{ | |
String topicName = argv[0]; | |
String subject = argv[1]; | |
String message = argv[2]; | |
AmazonSNS client = AmazonSNSClientBuilder.defaultClient(); | |
String topicArn = lookupTopicArnFromName(client, topicName); | |
logger.debug("publishing to {}", topicArn); | |
PublishRequest request = new PublishRequest() | |
.withTopicArn(topicArn) | |
.withSubject(subject) | |
.withMessage(message); | |
PublishResult response = client.publish(request); | |
logger.debug("message ID: {}", response.getMessageId()); | |
} | |
private static String lookupTopicArnFromName(AmazonSNS client, String topicName) | |
{ | |
logger.debug("retrieving ARN for topic {}", topicName); | |
ListTopicsRequest request = new ListTopicsRequest(); | |
do | |
{ | |
ListTopicsResult result = client.listTopics(request); | |
for (Topic topic : result.getTopics()) | |
{ | |
if (topic.getTopicArn().endsWith(":" + topicName)) | |
return topic.getTopicArn(); | |
} | |
request.setNextToken(result.getNextToken()); | |
} while (request.getNextToken() != null); | |
throw new IllegalStateException("topic " + topicName + " not found"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) Keith D Gregory, all rights reserved | |
package com.example.aws.sns; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.amazonaws.services.sns.AmazonSNS; | |
import com.amazonaws.services.sns.AmazonSNSClientBuilder; | |
import com.amazonaws.services.sns.model.*; | |
/** | |
* Subscribes an email address to receive notifications from a named SNS topic. | |
* <p> | |
* Invocation: | |
* <pre> | |
* java -cp UBERJAR com.example.aws.sns.SNSSubscribeEmail TOPIC_NAME EMAIL | |
* </pre> | |
* <ul> | |
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code> | |
* <li> <code>TOPIC_NAME</code> is the name of the SNS topic (must already exist) | |
* <li> <code>EMAIL</code> is the email address to subscribe | |
* </ul> | |
*/ | |
public class SNSSubscribeEmail | |
{ | |
private static Logger logger = LoggerFactory.getLogger(SNSSubscribeEmail.class); | |
public static void main(String[] argv) | |
throws Exception | |
{ | |
String topicName = argv[0]; | |
String emailAddress = argv[1]; | |
AmazonSNS client = AmazonSNSClientBuilder.defaultClient(); | |
String topicArn = lookupTopicArnFromName(client, topicName); | |
logger.debug("subscribing {} to topic {}", emailAddress, topicName); | |
client.subscribe(topicArn, "email", emailAddress); | |
} | |
private static String lookupTopicArnFromName(AmazonSNS client, String topicName) | |
{ | |
logger.debug("retrieving ARN for topic {}", topicName); | |
ListTopicsRequest request = new ListTopicsRequest(); | |
do | |
{ | |
ListTopicsResult result = client.listTopics(request); | |
for (Topic topic : result.getTopics()) | |
{ | |
if (topic.getTopicArn().endsWith(":" + topicName)) | |
return topic.getTopicArn(); | |
} | |
request.setNextToken(result.getNextToken()); | |
} while (request.getNextToken() != null); | |
throw new IllegalStateException("topic " + topicName + " not found"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) Keith D Gregory, all rights reserved | |
package com.example.aws.sns; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.amazonaws.services.sns.AmazonSNS; | |
import com.amazonaws.services.sns.AmazonSNSClientBuilder; | |
import com.amazonaws.services.sns.model.*; | |
/** | |
* Subscribes a phone number to receive notifications from a named SNS topic. | |
* <p> | |
* Invocation: | |
* <pre> | |
* java -cp UBERJAR com.example.aws.sns.SNSSubscribePhone TOPIC_NAME PHONE_NUMBER | |
* </pre> | |
* <ul> | |
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code> | |
* <li> <code>TOPIC_NAME</code> is the name of the SNS topic (must already exist) | |
* <li> <code>PHONE_NUMBER</code> is the phone number to receive messages | |
* </ul> | |
*/ | |
public class SNSSubscribePhone | |
{ | |
private static Logger logger = LoggerFactory.getLogger(SNSSubscribePhone.class); | |
public static void main(String[] argv) | |
throws Exception | |
{ | |
String topicName = argv[0]; | |
String phoneNumber = argv[1]; | |
AmazonSNS client = AmazonSNSClientBuilder.defaultClient(); | |
String topicArn = lookupTopicArnFromName(client, topicName); | |
logger.debug("subscribing phone number {} to topic {}", phoneNumber, topicName); | |
client.subscribe(topicArn, "sms", phoneNumber); | |
} | |
private static String lookupTopicArnFromName(AmazonSNS client, String topicName) | |
{ | |
logger.debug("retrieving ARN for topic {}", topicName); | |
ListTopicsRequest request = new ListTopicsRequest(); | |
do | |
{ | |
ListTopicsResult result = client.listTopics(request); | |
for (Topic topic : result.getTopics()) | |
{ | |
if (topic.getTopicArn().endsWith(":" + topicName)) | |
return topic.getTopicArn(); | |
} | |
request.setNextToken(result.getNextToken()); | |
} while (request.getNextToken() != null); | |
throw new IllegalStateException("topic " + topicName + " not found"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) Keith D Gregory, all rights reserved | |
package com.example.aws.sns; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.amazonaws.services.sns.AmazonSNS; | |
import com.amazonaws.services.sns.AmazonSNSClientBuilder; | |
import com.amazonaws.services.sns.model.*; | |
import com.amazonaws.services.sns.util.Topics; | |
import com.amazonaws.services.sqs.AmazonSQS; | |
import com.amazonaws.services.sqs.AmazonSQSClientBuilder; | |
import com.amazonaws.services.sqs.model.*; | |
/** | |
* Subscribes an SQS queue to receive notifications from a named SNS topic. | |
* <p> | |
* Invocation: | |
* <pre> | |
* java -cp UBERJAR com.example.aws.sns.SNSSubscribeSQS TOPIC_NAME QUEUE_NAME | |
* </pre> | |
* <ul> | |
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code> | |
* <li> <code>TOPIC_NAME</code> is the name of the SNS topic (must already exist) | |
* <li> <code>QUEUE_NAME</code> is the name of the SQS queue to receive notifications (must already exist) | |
* </ul> | |
* Note: in the real world you'd use com.amazonaws.services.sns.util.Topics.subscribeQueue() | |
* rather than explicitly creating an access policy and subscribing the queue. | |
*/ | |
public class SNSSubscribeSQS | |
{ | |
private static Logger logger = LoggerFactory.getLogger(SNSSubscribeSQS.class); | |
public static void main(String[] argv) | |
throws Exception | |
{ | |
String topicName = argv[0]; | |
String queueName = argv[1]; | |
AmazonSNS snsClient = AmazonSNSClientBuilder.defaultClient(); | |
AmazonSQS sqsClient = AmazonSQSClientBuilder.defaultClient(); | |
String topicArn = lookupTopicArnFromName(snsClient, topicName); | |
String queueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl(); | |
String queueArn = retrieveQueueAttribute(sqsClient, queueUrl, "QueueArn"); | |
logger.debug("subscribing queue {} to topic {}", queueArn, topicArn); | |
Topics.subscribeQueue(snsClient, sqsClient, topicArn, queueUrl, true); | |
} | |
private static String lookupTopicArnFromName(AmazonSNS client, String topicName) | |
{ | |
logger.debug("retrieving ARN for topic {}", topicName); | |
ListTopicsRequest request = new ListTopicsRequest(); | |
do | |
{ | |
ListTopicsResult result = client.listTopics(request); | |
for (Topic topic : result.getTopics()) | |
{ | |
if (topic.getTopicArn().endsWith(":" + topicName)) | |
return topic.getTopicArn(); | |
} | |
request.setNextToken(result.getNextToken()); | |
} while (request.getNextToken() != null); | |
throw new IllegalStateException("topic " + topicName + " not found"); | |
} | |
private static String retrieveQueueAttribute(AmazonSQS sqsClient, String queueUrl, String attributeName) | |
{ | |
// we need to loop because changing attributes may take up to 60 seconds per doc | |
for (int ii = 0 ; ii < 60 ; ii++) | |
{ | |
GetQueueAttributesRequest attribsRequest = new GetQueueAttributesRequest() | |
.withQueueUrl(queueUrl) | |
.withAttributeNames(attributeName); | |
GetQueueAttributesResult attribsResponse = sqsClient.getQueueAttributes(attribsRequest); | |
String value = attribsResponse.getAttributes().get(attributeName); | |
if ((value != null) && ! value.equals("")) | |
return value; | |
try | |
{ | |
Thread.sleep(1000); | |
} | |
catch (InterruptedException ex) | |
{ | |
throw new RuntimeException("thread was interrupted"); | |
} | |
} | |
throw new IllegalStateException("unable to retrieve queue attribute: " + attributeName); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) Keith D Gregory, all rights reserved | |
package com.example.aws.sqs; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.amazonaws.services.sqs.AmazonSQS; | |
import com.amazonaws.services.sqs.AmazonSQSClientBuilder; | |
import com.amazonaws.services.sqs.model.*; | |
/** | |
* Consumes messages from a named queue, sleeping after each one (to simulate processing time). | |
* <p> | |
* Invocation: | |
* <pre> | |
* java -cp UBERJAR com.example.aws.sqs.SQSConsumer QUEUE_NAME SLEEP_MILLIS | |
* </pre> | |
* <ul> | |
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code> | |
* <li> <code>QUEUE_NAME</code> is the name of the queue (must already exist) | |
* <li> <code>SLEEP_MILLIS</code> is the amount of time to sleep while processing each message | |
* </ul> | |
*/ | |
public class SQSConsumer | |
{ | |
private final static Logger logger = LoggerFactory.getLogger(SQSConsumer.class); | |
public static void main(String[] argv) | |
throws Exception | |
{ | |
String queueName = argv[0]; | |
int sleepTime = Integer.parseInt(argv[1]); | |
AmazonSQS client = AmazonSQSClientBuilder.defaultClient(); | |
String queueUrl = client.getQueueUrl(queueName).getQueueUrl(); | |
while (true) | |
{ | |
try | |
{ | |
ReceiveMessageRequest request = new ReceiveMessageRequest() | |
.withQueueUrl(queueUrl) | |
.withWaitTimeSeconds(5) | |
.withMaxNumberOfMessages(3); | |
ReceiveMessageResult result = client.receiveMessage(request); | |
logger.debug("received {} messages", result.getMessages().size()); | |
for (Message message : result.getMessages()) | |
{ | |
logger.debug("message ID {}: {}", message.getMessageId(), message.getBody()); | |
Thread.sleep(sleepTime); | |
client.deleteMessage(queueName, message.getReceiptHandle()); | |
} | |
} | |
catch (Exception ex) | |
{ | |
logger.warn("unexpected exception: " + ex); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) Keith D Gregory, all rights reserved | |
package com.example.aws.sqs; | |
import java.time.Instant; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.amazonaws.services.sqs.AmazonSQS; | |
import com.amazonaws.services.sqs.AmazonSQSClientBuilder; | |
/** | |
* Continuously writes messages to a named queue. | |
* <p> | |
* Invocation: | |
* <pre> | |
* java -cp UBERJAR com.example.aws.sqs.SQSProducer QUEUE_NAME RATE | |
* </pre> | |
* <ul> | |
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code> | |
* <li> <code>QUEUE_NAME</code> is the name of the queue (must already exist) | |
* <li> <code>RATE</code> is the number of messages to send per second | |
* </ul> | |
*/ | |
public class SQSProducer | |
{ | |
private final static Logger logger = LoggerFactory.getLogger(SQSProducer.class); | |
public static void main(String[] argv) | |
throws Exception | |
{ | |
String queueName = argv[0]; | |
int rate = Integer.parseInt(argv[1]); | |
AmazonSQS client = AmazonSQSClientBuilder.defaultClient(); | |
for (int ii = 0 ; ii < Integer.MAX_VALUE ; ii++) | |
{ | |
String message = "message #" + ii + " sent " + Instant.now().toString(); | |
try | |
{ | |
client.sendMessage(queueName, message); | |
logger.debug(message); | |
Thread.sleep(1000 / rate); | |
} | |
catch (Exception ex) | |
{ | |
logger.warn("unexpected exception: " + ex); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment