Last active
October 1, 2022 04:00
-
-
Save UnquietCode/5717942 to your computer and use it in GitHub Desktop.
Mock AWS SQS implementation which operatesin-memory rather than hitting the real SQS.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.amazonaws.AmazonClientException; | |
import com.amazonaws.AmazonServiceException; | |
import com.amazonaws.AmazonWebServiceRequest; | |
import com.amazonaws.ResponseMetadata; | |
import com.amazonaws.regions.Region; | |
import com.amazonaws.services.sqs.AmazonSQS; | |
import com.amazonaws.services.sqs.model.*; | |
import com.google.common.hash.Hashing; | |
import java.util.*; | |
import java.util.concurrent.*; | |
import static com.google.common.base.Preconditions.checkArgument; | |
import static com.google.common.base.Preconditions.checkNotNull; | |
/** | |
* Sitting in the airport, unable to connect to the internet, this seemed | |
* like a good use of my time, at least as compared with sleeping. | |
* | |
* @author Ben Fagin | |
* @version 2013-05-28 | |
*/ | |
public class MockSQS implements AmazonSQS { | |
private final Map<String, Queue<MessageInfo>> queues = new HashMap<>(); | |
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); | |
private int timeout = 35*60; | |
private final Map<String, ScheduledMessage> receivedMessages = new HashMap<>(); | |
/* | |
- adds a message to the correct queue | |
- delays if required | |
*/ | |
@Override | |
public SendMessageResult sendMessage(final SendMessageRequest request) throws AmazonServiceException, AmazonClientException { | |
final Queue<MessageInfo> queue = getOrCreateQueue(request.getQueueUrl()); | |
final MessageInfo info = new MessageInfo(); | |
info.body = checkNotNull(request.getMessageBody()); | |
info.id = UUID.randomUUID().toString(); | |
if (request.getDelaySeconds() == null) { | |
queue.add(info); | |
} else { | |
Runnable task = new Runnable() { | |
public void run() { | |
queue.add(info); | |
} | |
}; | |
executor.schedule(task, request.getDelaySeconds(), TimeUnit.SECONDS); | |
} | |
return new SendMessageResult().withMessageId(info.id).withMD5OfMessageBody(info.hash()); | |
} | |
/* | |
- takes messages off the queue | |
- if timeout, then they are added back | |
*/ | |
@Override | |
public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) throws AmazonServiceException, AmazonClientException { | |
final Queue<MessageInfo> queue = getOrCreateQueue(request.getQueueUrl()); | |
List<Message> messages = new ArrayList<>(); | |
Integer max = request.getMaxNumberOfMessages(); | |
if (max == null) { max = 0; } | |
checkArgument(max <= 10 && max > 0); | |
Integer visibilityTimeout = request.getVisibilityTimeout(); | |
if (visibilityTimeout == null) { visibilityTimeout = timeout; } | |
for (int i=0; i < max; ++i) { | |
final MessageInfo info = queue.poll(); | |
if (info != null) { | |
final String receiptHandle = UUID.randomUUID().toString(); | |
Message message = new Message(); | |
message.setBody(info.body); | |
message.setMessageId(info.id); | |
message.setMD5OfBody(info.hash()); | |
message.setReceiptHandle(receiptHandle); | |
messages.add(message); | |
Runnable command = new Runnable() { | |
public void run() { | |
queue.add(info); | |
receivedMessages.remove(receiptHandle); | |
} | |
}; | |
ScheduledMessage scheduled = new ScheduledMessage(); | |
scheduled.future = executor.schedule(command, visibilityTimeout, TimeUnit.SECONDS); | |
scheduled.runnable = command; | |
receivedMessages.put(message.getReceiptHandle(), scheduled); | |
} | |
} | |
return new ReceiveMessageResult().withMessages(messages); | |
} | |
/* | |
- deletes the task which would have re-added a message to the queue, | |
effectively deleting the message | |
*/ | |
@Override | |
public void deleteMessage(DeleteMessageRequest request) throws AmazonServiceException, AmazonClientException { | |
ScheduledMessage scheduled = receivedMessages.remove(request.getReceiptHandle()); | |
if (scheduled == null) { | |
throw new RuntimeException("message does not exist"); | |
} | |
scheduled.future.cancel(true); | |
} | |
@Override | |
public void changeMessageVisibility(ChangeMessageVisibilityRequest request) throws AmazonServiceException, AmazonClientException { | |
ScheduledMessage scheduled = receivedMessages.get(request.getReceiptHandle()); | |
if (scheduled == null) { | |
throw new RuntimeException("message does not exist"); | |
} | |
scheduled.future.cancel(true); | |
scheduled.future = executor.schedule(scheduled.runnable, checkNotNull(request.getVisibilityTimeout()), TimeUnit.SECONDS); | |
} | |
@Override | |
public void shutdown() { | |
executor.shutdown(); | |
receivedMessages.clear(); | |
queues.clear(); | |
} | |
private static class MessageInfo { | |
String body; | |
String id; | |
String hash() { | |
return Hashing.md5().hashString(body).toString(); | |
} | |
} | |
private static class ScheduledMessage { | |
ScheduledFuture future; | |
Runnable runnable; | |
} | |
private Queue<MessageInfo> getOrCreateQueue(String url) { | |
Queue<MessageInfo> queue = queues.get(checkNotNull(url)); | |
if (queue == null) { | |
synchronized (queues) { | |
queue = queues.get(checkNotNull(url)); | |
if (queue == null) { | |
queue = new ArrayDeque<>(); | |
queues.put(url, queue); | |
} | |
} | |
} | |
return queue; | |
} | |
/* | |
- set the default timeout when receiving messages from the queue | |
*/ | |
public void setTimeout(int timeout) { | |
this.timeout = timeout; | |
} | |
//---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---// | |
@Override | |
public void setEndpoint(String endpoint) throws IllegalArgumentException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public void setRegion(Region region) throws IllegalArgumentException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public void setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) throws AmazonServiceException, AmazonClientException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonServiceException, AmazonClientException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public GetQueueUrlResult getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws AmazonServiceException, AmazonClientException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public void removePermission(RemovePermissionRequest removePermissionRequest) throws AmazonServiceException, AmazonClientException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) throws AmazonServiceException, AmazonClientException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonServiceException, AmazonClientException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public void deleteQueue(DeleteQueueRequest deleteQueueRequest) throws AmazonServiceException, AmazonClientException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequest) throws AmazonServiceException, AmazonClientException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonServiceException, AmazonClientException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public void addPermission(AddPermissionRequest addPermissionRequest) throws AmazonServiceException, AmazonClientException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClientException { | |
throw new RuntimeException("not implemented"); | |
} | |
@Override | |
public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) { | |
throw new RuntimeException("not implemented"); | |
} | |
} |
Probably they do end up back at the head of the queue, or that the queue is ordered by time of arrival. Here that would mean using something like a dequeue, but for simplicity I've chosen to use a Queue instance instead.
Hi!
Just wanted to say THANK YOU VERY MUCH for this one. It really helps us big time. I created our own implementation https://gist.github.com/orvyl/870fba7ddbaeeb6ad651930a1c3c98aa
because we are now using Java8 and aws-java-sdk v1.11.86 and it was 100% based into this one. Again, thank you!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@UnquietCode this is pretty cool. Do you know if Amazon's actual SQS implementation re-enqueues messages back to the end or head of the queue when they haven't been deleted after the visibility timeout expires? (You're using
queue#add
https://gist.github.com/UnquietCode/5717942#file-mocksqs-java-L85-L90 but I would have thought they add to the head but can't find any concrete answer).