Skip to content

Instantly share code, notes, and snippets.

@kdgregory
Last active December 3, 2018 19:42
Show Gist options
  • Save kdgregory/84fe6fc26ee6424ecc12926565fee304 to your computer and use it in GitHub Desktop.
Save kdgregory/84fe6fc26ee6424ecc12926565fee304 to your computer and use it in GitHub Desktop.
Examples for AWS messaging talk
# SQS examples
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.sqs.Producer example 3
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.sqs.Consumer example 1000
# SNS examples
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.sns.SubscribeEmail example [email protected]
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.sns.SubscribeSQS example example
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.sns.Producer example "Sample message" "This is an example"
# Kinesis examples
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.kinesis.Producer example 3
java -cp target/aws-comms-1.0.0-SNAPSHOT.jar com.example.aws.kinesis.Consumer example 1
// Copyright (c) Keith D Gregory, all rights reserved
package com.example.aws.kinesis;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.*;
import com.amazonaws.util.BinaryUtils;
/**
* Continuously reads a named stream at a specified rate, starting at either a specified
* sequence number or the the end.
* <p>
* Invocation:
* <pre>
* java -cp UBERJAR com.example.aws.kinesis.KinesisConsumer STREAM_NAME RATE [SEQNUMS]
* </pre>
* <ul>
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code>
* <li> <code>STREAM_NAME</code> is the name of the queue (must already exist)
* <li> <code>RATE</code> is the number of read attempts per second
* <li> <code>SEQNUMS</code> is a space-separated list of sequence numbers, each
* corresponding to a shard; if there's no sequence number for a shard we
* start reading from the end.
* </ul>
*/
public class KinesisConsumer
{
private final static Logger logger = LoggerFactory.getLogger(KinesisConsumer.class);
public static void main(String[] argv)
throws Exception
{
String streamName = argv[0];
int rate = Integer.parseInt(argv[1]);
ArrayList<String> seqnums = new ArrayList<>();
for (int ii = 2 ; ii < argv.length ; ii++)
seqnums.add(argv[ii]);
AmazonKinesis client = AmazonKinesisClientBuilder.defaultClient();
String[] shardIterators = retrieveInitialShardIterators(client, streamName, seqnums.iterator());
while (true)
{
readStream(client, shardIterators);
Thread.sleep(1000 / rate);
}
}
private static String[] retrieveInitialShardIterators(AmazonKinesis client, String streamName, Iterator<String> seqnums)
{
logger.info("retrieving shard iterators");
DescribeStreamResult describeResult = client.describeStream(streamName);
List<Shard> shards = describeResult.getStreamDescription().getShards();
List<String> shardIterators = new ArrayList<>();
for (Shard shard : shards)
{
GetShardIteratorRequest shardItxRequest = new GetShardIteratorRequest()
.withStreamName(streamName)
.withShardId(shard.getShardId());
if (seqnums.hasNext())
{
shardItxRequest.setShardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
shardItxRequest.setStartingSequenceNumber(seqnums.next());
}
else
{
shardItxRequest.setShardIteratorType(ShardIteratorType.LATEST);
}
GetShardIteratorResult shardItxResult = client.getShardIterator(shardItxRequest);
shardIterators.add(shardItxResult.getShardIterator());
}
return shardIterators.toArray(new String[shardIterators.size()]);
}
private static void readStream(AmazonKinesis client, String[] shardIterators)
{
for (int ii = 0 ; ii < shardIterators.length ; ii++)
{
logger.info("reading shard {}", ii);
GetRecordsRequest request = new GetRecordsRequest()
.withShardIterator(shardIterators[ii]);
try
{
GetRecordsResult result = client.getRecords(request);
logger.info("retrieved {} records from shard {}; millis behind = {}", result.getRecords().size(), ii, result.getMillisBehindLatest());
processRecords(result.getRecords());
shardIterators[ii] = result.getNextShardIterator();
}
catch (Exception ex)
{
logger.warn("exception reading shard: {}", ex.getMessage());
}
}
}
private static void processRecords(List<Record> records)
throws Exception
{
for (Record record : records)
{
byte[] recordData = BinaryUtils.copyAllBytesFrom(record.getData());
String recordStr = new String(recordData, "UTF-8");
logger.info("sequence number: {}, partion key: {}, data: {}",
record.getSequenceNumber(), record.getPartitionKey(), recordStr);
}
}
}
// Copyright (c) Keith D Gregory, all rights reserved
package com.example.aws.kinesis;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.*;
/**
* Continuously writes batches of JSON messages to a stream using a random partition key.
* <p>
* Invocation:
* <pre>
* java -cp UBERJAR com.example.aws.kinesis.KinesisProducer STREAM_NAME MAX_BATCH_SIZE
* </pre>
* <ul>
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code>
* <li> <code>STREAM_NAME</code> is the name of the queue (must already exist)
* <li> <code>MAX_BATCH_SIZE</code> is the maximum number of messages to send per batch
* </ul>
*/
public class KinesisProducer
{
private final static Logger logger = LoggerFactory.getLogger(KinesisProducer.class);
public static void main(String[] argv)
throws Exception
{
String streamName = argv[0];
int messagesPerBatch = Integer.parseInt(argv[1]);
AmazonKinesis client = AmazonKinesisClientBuilder.defaultClient();
for (int batchNum = 0 ; batchNum < Integer.MAX_VALUE ; batchNum++)
{
List<PutRecordsRequestEntry> batch = new ArrayList<>();
for (int ii = 0 ; ii < messagesPerBatch ; ii++)
{
batch.add(createRecord(batchNum, ii));
// normally you would check the total batch size here
}
sendBatch(client, streamName, batchNum, batch);
Thread.sleep(1000);
}
}
private static PutRecordsRequestEntry createRecord(int batchNum, int messageNum)
throws Exception
{
String message = "batch " + batchNum + " message " + messageNum + " at " + Instant.now();
ByteBuffer messageBuf = ByteBuffer.wrap(message.getBytes("UTF-8"));
return new PutRecordsRequestEntry()
.withData(messageBuf)
.withPartitionKey(String.valueOf(batchNum));
}
private static void sendBatch(AmazonKinesis client, String streamName, int batchNum, List<PutRecordsRequestEntry> batch)
throws Exception
{
logger.info("sending batch {}, containing {} records", batchNum, batch.size());
PutRecordsRequest request = new PutRecordsRequest()
.withStreamName(streamName)
.withRecords(batch);
PutRecordsResult result = client.putRecords(request);
List<PutRecordsRequestEntry> retries = new ArrayList<PutRecordsRequestEntry>();
int internalErrorCount = 0;
Iterator<PutRecordsRequestEntry> requestEntryItx = batch.iterator();
Iterator<PutRecordsResultEntry> resultEntryItx = result.getRecords().iterator();
while (resultEntryItx.hasNext())
{
PutRecordsResultEntry resultEntry = resultEntryItx.next();
PutRecordsRequestEntry requestEntry = requestEntryItx.next();
if ("ProvisionedThroughputExceededException".equals(resultEntry.getErrorCode()))
retries.add(requestEntry);
else if ("InternalFailure".equals(resultEntry.getErrorCode()))
internalErrorCount++;
}
if (internalErrorCount > 0)
logger.warn("discarded {} records from batch {} due to internal errors", internalErrorCount, batchNum);
if (retries.size() > 0)
{
logger.info("retrying {} records from batch {}", retries.size(), batchNum);
Thread.sleep(1000);
sendBatch(client, streamName, batchNum, retries);
}
else
{
logger.info("finished sending batch {}", batchNum);
}
}
}
// Copyright (c) Keith D Gregory, all rights reserved
package com.example.aws.sns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.*;
/**
* Sends a single message to a named SNS topic.
* <p>
* Invocation:
* <pre>
* java -cp UBERJAR com.example.aws.sns.SNSProducer TOPIC_NAME SUBJECT MESSAGE
* </pre>
* <ul>
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code>
* <li> <code>TOPIC_NAME</code> is the name of the SNS topic (must already exist)
* <li> <code>SUBJECT</code> is a subject for the message
* <li> <code>MESSAGE</code> is the message to send
* </ul>
*/
public class SNSProducer
{
private final static Logger logger = LoggerFactory.getLogger(SNSProducer.class);
public static void main(String[] argv)
throws Exception
{
String topicName = argv[0];
String subject = argv[1];
String message = argv[2];
AmazonSNS client = AmazonSNSClientBuilder.defaultClient();
String topicArn = lookupTopicArnFromName(client, topicName);
logger.debug("publishing to {}", topicArn);
PublishRequest request = new PublishRequest()
.withTopicArn(topicArn)
.withSubject(subject)
.withMessage(message);
PublishResult response = client.publish(request);
logger.debug("message ID: {}", response.getMessageId());
}
private static String lookupTopicArnFromName(AmazonSNS client, String topicName)
{
logger.debug("retrieving ARN for topic {}", topicName);
ListTopicsRequest request = new ListTopicsRequest();
do
{
ListTopicsResult result = client.listTopics(request);
for (Topic topic : result.getTopics())
{
if (topic.getTopicArn().endsWith(":" + topicName))
return topic.getTopicArn();
}
request.setNextToken(result.getNextToken());
} while (request.getNextToken() != null);
throw new IllegalStateException("topic " + topicName + " not found");
}
}
// Copyright (c) Keith D Gregory, all rights reserved
package com.example.aws.sns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.*;
/**
* Subscribes an email address to receive notifications from a named SNS topic.
* <p>
* Invocation:
* <pre>
* java -cp UBERJAR com.example.aws.sns.SNSSubscribeEmail TOPIC_NAME EMAIL
* </pre>
* <ul>
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code>
* <li> <code>TOPIC_NAME</code> is the name of the SNS topic (must already exist)
* <li> <code>EMAIL</code> is the email address to subscribe
* </ul>
*/
public class SNSSubscribeEmail
{
private static Logger logger = LoggerFactory.getLogger(SNSSubscribeEmail.class);
public static void main(String[] argv)
throws Exception
{
String topicName = argv[0];
String emailAddress = argv[1];
AmazonSNS client = AmazonSNSClientBuilder.defaultClient();
String topicArn = lookupTopicArnFromName(client, topicName);
logger.debug("subscribing {} to topic {}", emailAddress, topicName);
client.subscribe(topicArn, "email", emailAddress);
}
private static String lookupTopicArnFromName(AmazonSNS client, String topicName)
{
logger.debug("retrieving ARN for topic {}", topicName);
ListTopicsRequest request = new ListTopicsRequest();
do
{
ListTopicsResult result = client.listTopics(request);
for (Topic topic : result.getTopics())
{
if (topic.getTopicArn().endsWith(":" + topicName))
return topic.getTopicArn();
}
request.setNextToken(result.getNextToken());
} while (request.getNextToken() != null);
throw new IllegalStateException("topic " + topicName + " not found");
}
}
// Copyright (c) Keith D Gregory, all rights reserved
package com.example.aws.sns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.*;
/**
* Subscribes a phone number to receive notifications from a named SNS topic.
* <p>
* Invocation:
* <pre>
* java -cp UBERJAR com.example.aws.sns.SNSSubscribePhone TOPIC_NAME PHONE_NUMBER
* </pre>
* <ul>
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code>
* <li> <code>TOPIC_NAME</code> is the name of the SNS topic (must already exist)
* <li> <code>PHONE_NUMBER</code> is the phone number to receive messages
* </ul>
*/
public class SNSSubscribePhone
{
private static Logger logger = LoggerFactory.getLogger(SNSSubscribePhone.class);
public static void main(String[] argv)
throws Exception
{
String topicName = argv[0];
String phoneNumber = argv[1];
AmazonSNS client = AmazonSNSClientBuilder.defaultClient();
String topicArn = lookupTopicArnFromName(client, topicName);
logger.debug("subscribing phone number {} to topic {}", phoneNumber, topicName);
client.subscribe(topicArn, "sms", phoneNumber);
}
private static String lookupTopicArnFromName(AmazonSNS client, String topicName)
{
logger.debug("retrieving ARN for topic {}", topicName);
ListTopicsRequest request = new ListTopicsRequest();
do
{
ListTopicsResult result = client.listTopics(request);
for (Topic topic : result.getTopics())
{
if (topic.getTopicArn().endsWith(":" + topicName))
return topic.getTopicArn();
}
request.setNextToken(result.getNextToken());
} while (request.getNextToken() != null);
throw new IllegalStateException("topic " + topicName + " not found");
}
}
// Copyright (c) Keith D Gregory, all rights reserved
package com.example.aws.sns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.*;
import com.amazonaws.services.sns.util.Topics;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.*;
/**
* Subscribes an SQS queue to receive notifications from a named SNS topic.
* <p>
* Invocation:
* <pre>
* java -cp UBERJAR com.example.aws.sns.SNSSubscribeSQS TOPIC_NAME QUEUE_NAME
* </pre>
* <ul>
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code>
* <li> <code>TOPIC_NAME</code> is the name of the SNS topic (must already exist)
* <li> <code>QUEUE_NAME</code> is the name of the SQS queue to receive notifications (must already exist)
* </ul>
* Note: in the real world you'd use com.amazonaws.services.sns.util.Topics.subscribeQueue()
* rather than explicitly creating an access policy and subscribing the queue.
*/
public class SNSSubscribeSQS
{
private static Logger logger = LoggerFactory.getLogger(SNSSubscribeSQS.class);
public static void main(String[] argv)
throws Exception
{
String topicName = argv[0];
String queueName = argv[1];
AmazonSNS snsClient = AmazonSNSClientBuilder.defaultClient();
AmazonSQS sqsClient = AmazonSQSClientBuilder.defaultClient();
String topicArn = lookupTopicArnFromName(snsClient, topicName);
String queueUrl = sqsClient.getQueueUrl(queueName).getQueueUrl();
String queueArn = retrieveQueueAttribute(sqsClient, queueUrl, "QueueArn");
logger.debug("subscribing queue {} to topic {}", queueArn, topicArn);
Topics.subscribeQueue(snsClient, sqsClient, topicArn, queueUrl, true);
}
private static String lookupTopicArnFromName(AmazonSNS client, String topicName)
{
logger.debug("retrieving ARN for topic {}", topicName);
ListTopicsRequest request = new ListTopicsRequest();
do
{
ListTopicsResult result = client.listTopics(request);
for (Topic topic : result.getTopics())
{
if (topic.getTopicArn().endsWith(":" + topicName))
return topic.getTopicArn();
}
request.setNextToken(result.getNextToken());
} while (request.getNextToken() != null);
throw new IllegalStateException("topic " + topicName + " not found");
}
private static String retrieveQueueAttribute(AmazonSQS sqsClient, String queueUrl, String attributeName)
{
// we need to loop because changing attributes may take up to 60 seconds per doc
for (int ii = 0 ; ii < 60 ; ii++)
{
GetQueueAttributesRequest attribsRequest = new GetQueueAttributesRequest()
.withQueueUrl(queueUrl)
.withAttributeNames(attributeName);
GetQueueAttributesResult attribsResponse = sqsClient.getQueueAttributes(attribsRequest);
String value = attribsResponse.getAttributes().get(attributeName);
if ((value != null) && ! value.equals(""))
return value;
try
{
Thread.sleep(1000);
}
catch (InterruptedException ex)
{
throw new RuntimeException("thread was interrupted");
}
}
throw new IllegalStateException("unable to retrieve queue attribute: " + attributeName);
}
}
// Copyright (c) Keith D Gregory, all rights reserved
package com.example.aws.sqs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.*;
/**
* Consumes messages from a named queue, sleeping after each one (to simulate processing time).
* <p>
* Invocation:
* <pre>
* java -cp UBERJAR com.example.aws.sqs.SQSConsumer QUEUE_NAME SLEEP_MILLIS
* </pre>
* <ul>
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code>
* <li> <code>QUEUE_NAME</code> is the name of the queue (must already exist)
* <li> <code>SLEEP_MILLIS</code> is the amount of time to sleep while processing each message
* </ul>
*/
public class SQSConsumer
{
private final static Logger logger = LoggerFactory.getLogger(SQSConsumer.class);
public static void main(String[] argv)
throws Exception
{
String queueName = argv[0];
int sleepTime = Integer.parseInt(argv[1]);
AmazonSQS client = AmazonSQSClientBuilder.defaultClient();
String queueUrl = client.getQueueUrl(queueName).getQueueUrl();
while (true)
{
try
{
ReceiveMessageRequest request = new ReceiveMessageRequest()
.withQueueUrl(queueUrl)
.withWaitTimeSeconds(5)
.withMaxNumberOfMessages(3);
ReceiveMessageResult result = client.receiveMessage(request);
logger.debug("received {} messages", result.getMessages().size());
for (Message message : result.getMessages())
{
logger.debug("message ID {}: {}", message.getMessageId(), message.getBody());
Thread.sleep(sleepTime);
client.deleteMessage(queueName, message.getReceiptHandle());
}
}
catch (Exception ex)
{
logger.warn("unexpected exception: " + ex);
}
}
}
}
// Copyright (c) Keith D Gregory, all rights reserved
package com.example.aws.sqs;
import java.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
/**
* Continuously writes messages to a named queue.
* <p>
* Invocation:
* <pre>
* java -cp UBERJAR com.example.aws.sqs.SQSProducer QUEUE_NAME RATE
* </pre>
* <ul>
* <li> <code>UBERJAR</code> is the JAR built by <code>mvn package</code>
* <li> <code>QUEUE_NAME</code> is the name of the queue (must already exist)
* <li> <code>RATE</code> is the number of messages to send per second
* </ul>
*/
public class SQSProducer
{
private final static Logger logger = LoggerFactory.getLogger(SQSProducer.class);
public static void main(String[] argv)
throws Exception
{
String queueName = argv[0];
int rate = Integer.parseInt(argv[1]);
AmazonSQS client = AmazonSQSClientBuilder.defaultClient();
for (int ii = 0 ; ii < Integer.MAX_VALUE ; ii++)
{
String message = "message #" + ii + " sent " + Instant.now().toString();
try
{
client.sendMessage(queueName, message);
logger.debug(message);
Thread.sleep(1000 / rate);
}
catch (Exception ex)
{
logger.warn("unexpected exception: " + ex);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment