Created
February 28, 2017 08:33
-
-
Save orvyl/870fba7ddbaeeb6ad651930a1c3c98aa to your computer and use it in GitHub Desktop.
AmazonSQS implementation that uses memory instead of actual AWS queue (using aws-java-sdk v1.11.86).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.amazonaws.AmazonWebServiceRequest; | |
import com.amazonaws.ResponseMetadata; | |
import com.amazonaws.regions.Region; | |
import com.amazonaws.services.sqs.AmazonSQS; | |
import com.amazonaws.services.sqs.model.*; | |
import com.google.common.hash.Hashing; | |
import java.nio.charset.Charset; | |
import java.util.ArrayDeque; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Objects; | |
import java.util.Queue; | |
import java.util.UUID; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.ScheduledFuture; | |
import java.util.concurrent.TimeUnit; | |
import static com.google.common.base.Preconditions.checkNotNull; | |
/** | |
* Created by oftumaneng on 16/02/2017. | |
* REF: https://gist.github.com/UnquietCode/5717942 | |
* | |
*/ | |
public class MockAmazonSQS implements AmazonSQS { | |
private static final int DEFAULT_VISIBILITY_TIMEOUT=30; | |
public static final String QUEUE_BASE_URL = "http://sqs.aws.com/"; | |
private final Map<String, Queue<MessageInfo>> queues = new HashMap<>(); | |
private final Map<String, ScheduledMessage> receivedMessages = new HashMap<>(); | |
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); | |
@Override | |
public SendMessageResult sendMessage(SendMessageRequest sendMessageRequest) { | |
final Queue<MessageInfo> queue = getOrCreateQueue(sendMessageRequest.getQueueUrl()); | |
MessageInfo info = new MessageInfo(); | |
info.id = UUID.randomUUID().toString(); | |
info.body = sendMessageRequest.getMessageBody(); | |
if (Objects.nonNull(sendMessageRequest.getDelaySeconds())) { | |
executor.schedule((Runnable) () -> queue.add(info), sendMessageRequest.getDelaySeconds(), TimeUnit.SECONDS); | |
} else { | |
queue.add(info); | |
} | |
return new SendMessageResult().withMessageId(info.id).withMD5OfMessageBody(info.hash()); | |
} | |
@Override | |
public SendMessageResult sendMessage(String queueUrl, String messageBody) { | |
return sendMessage(new SendMessageRequest(queueUrl, messageBody)); | |
} | |
@Override | |
public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) { | |
final Queue<MessageInfo> queue = getOrCreateQueue(receiveMessageRequest.getQueueUrl()); | |
List<Message> messages = new ArrayList<>(); | |
Integer maxMessage = Objects.nonNull(receiveMessageRequest.getMaxNumberOfMessages()) | |
? receiveMessageRequest.getMaxNumberOfMessages() : 1; | |
Integer visibilityTimeout = Objects.nonNull(receiveMessageRequest.getVisibilityTimeout()) | |
? receiveMessageRequest.getVisibilityTimeout() : DEFAULT_VISIBILITY_TIMEOUT; | |
final Integer waitTimeSeconds = Objects.nonNull(receiveMessageRequest.getWaitTimeSeconds()) | |
? receiveMessageRequest.getWaitTimeSeconds() : 0; | |
int waitSecondsCtr = 0; | |
do { | |
final MessageInfo info = queue.poll(); | |
waitSecondsCtr++; | |
if (Objects.isNull(info)) continue; | |
final String receiptHandle = UUID.randomUUID().toString(); | |
Message message = new Message(); | |
message.setBody(info.body); | |
message.setMessageId(info.id); | |
message.setMD5OfBody(info.hash()); | |
message.setReceiptHandle(receiptHandle); | |
messages.add(message); | |
ScheduledMessage scheduled = new ScheduledMessage(); | |
final Runnable command = () -> { | |
queue.add(info); | |
receivedMessages.remove(receiptHandle); | |
}; | |
scheduled.future = executor.schedule(command, visibilityTimeout, TimeUnit.SECONDS); | |
scheduled.runnable = command; | |
receivedMessages.put(message.getReceiptHandle(), scheduled); | |
if (messages.size() == maxMessage) break; | |
} while (waitTimeSeconds > waitSecondsCtr); | |
return new ReceiveMessageResult().withMessages(messages); | |
} | |
@Override | |
public ReceiveMessageResult receiveMessage(String queueUrl) { | |
return receiveMessage(new ReceiveMessageRequest(queueUrl)); | |
} | |
@Override | |
public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageRequest) { | |
ScheduledMessage scheduled = receivedMessages.remove(deleteMessageRequest.getReceiptHandle()); | |
if (scheduled == null) { | |
throw new RuntimeException("message does not exist"); | |
} | |
scheduled.future.cancel(true); | |
return new DeleteMessageResult(); | |
} | |
@Override | |
public DeleteMessageResult deleteMessage(String queueUrl, String receiptHandle) { | |
return deleteMessage(new DeleteMessageRequest(queueUrl, receiptHandle)); | |
} | |
@Override | |
public GetQueueUrlResult getQueueUrl(GetQueueUrlRequest request) { | |
final GetQueueUrlResult result = new GetQueueUrlResult(); | |
final String url = QUEUE_BASE_URL + request.getQueueName(); | |
getOrCreateQueue(url); | |
result.setQueueUrl(url); | |
return result; | |
} | |
@Override | |
public GetQueueUrlResult getQueueUrl(String queueName) { | |
return getQueueUrl(new GetQueueUrlRequest(queueName)); | |
} | |
@Override | |
public void shutdown() { | |
executor.shutdown(); | |
receivedMessages.clear(); | |
queues.clear(); | |
} | |
private static class MessageInfo { | |
String id; | |
String body; | |
String hash() { | |
return Hashing.md5().hashString(body, Charset.defaultCharset()).toString(); | |
} | |
} | |
private synchronized Queue<MessageInfo> getOrCreateQueue(String url) { | |
Queue<MessageInfo> queue = queues.get(checkNotNull(url)); | |
if (Objects.isNull(queue)) { | |
queue = new ArrayDeque<>(); | |
queues.put(url, queue); | |
} | |
return queue; | |
} | |
private static class ScheduledMessage { | |
ScheduledFuture future; | |
Runnable runnable; | |
} | |
public Map<String, Queue<MessageInfo>> getQueuesInHeap() { | |
return queues; | |
} | |
public Map<String, ScheduledMessage> getReceivedMessagesInHeap() { | |
return receivedMessages; | |
} | |
/*--Not needed in my testing--*/ | |
@Override | |
public void setEndpoint(String endpoint) { | |
} | |
@Override | |
public void setRegion(Region region) { | |
} | |
@Override | |
public AddPermissionResult addPermission(AddPermissionRequest addPermissionRequest) { | |
return null; | |
} | |
@Override | |
public AddPermissionResult addPermission(String queueUrl, String label, List<String> aWSAccountIds, List<String> actions) { | |
return null; | |
} | |
@Override | |
public ChangeMessageVisibilityResult changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) { | |
return null; | |
} | |
@Override | |
public ChangeMessageVisibilityResult changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout) { | |
return null; | |
} | |
@Override | |
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) { | |
return null; | |
} | |
@Override | |
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(String queueUrl, List<ChangeMessageVisibilityBatchRequestEntry> entries) { | |
return null; | |
} | |
@Override | |
public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) { | |
return null; | |
} | |
@Override | |
public CreateQueueResult createQueue(String queueName) { | |
return null; | |
} | |
@Override | |
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) { | |
return null; | |
} | |
@Override | |
public DeleteMessageBatchResult deleteMessageBatch(String queueUrl, List<DeleteMessageBatchRequestEntry> entries) { | |
return null; | |
} | |
@Override | |
public DeleteQueueResult deleteQueue(DeleteQueueRequest deleteQueueRequest) { | |
return null; | |
} | |
@Override | |
public DeleteQueueResult deleteQueue(String queueUrl) { | |
return null; | |
} | |
@Override | |
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) { | |
return null; | |
} | |
@Override | |
public GetQueueAttributesResult getQueueAttributes(String queueUrl, List<String> attributeNames) { | |
return null; | |
} | |
@Override | |
public ListDeadLetterSourceQueuesResult listDeadLetterSourceQueues(ListDeadLetterSourceQueuesRequest listDeadLetterSourceQueuesRequest) { | |
return null; | |
} | |
@Override | |
public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequest) { | |
return null; | |
} | |
@Override | |
public ListQueuesResult listQueues() { | |
return null; | |
} | |
@Override | |
public ListQueuesResult listQueues(String queueNamePrefix) { | |
return null; | |
} | |
@Override | |
public PurgeQueueResult purgeQueue(PurgeQueueRequest purgeQueueRequest) { | |
return null; | |
} | |
@Override | |
public RemovePermissionResult removePermission(RemovePermissionRequest removePermissionRequest) { | |
return null; | |
} | |
@Override | |
public RemovePermissionResult removePermission(String queueUrl, String label) { | |
return null; | |
} | |
@Override | |
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) { | |
return null; | |
} | |
@Override | |
public SendMessageBatchResult sendMessageBatch(String queueUrl, List<SendMessageBatchRequestEntry> entries) { | |
return null; | |
} | |
@Override | |
public SetQueueAttributesResult setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) { | |
return null; | |
} | |
@Override | |
public SetQueueAttributesResult setQueueAttributes(String queueUrl, Map<String, String> attributes) { | |
return null; | |
} | |
@Override | |
public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) { | |
return null; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Wow! This is very useful. Thanks for sharing this ^_^