Created
August 5, 2019 04:45
-
-
Save nubunto/d65e67e2d971ea892e20ce9b9ad0adbc to your computer and use it in GitHub Desktop.
Apache Storm spout implementation for SQS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.your.package; | |
import org.apache.storm.spout.SpoutOutputCollector; | |
import org.apache.storm.task.TopologyContext; | |
import org.apache.storm.topology.base.BaseRichSpout; | |
import org.apache.storm.utils.Utils; | |
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; | |
import software.amazon.awssdk.regions.Region; | |
import software.amazon.awssdk.services.sqs.SqsClient; | |
import software.amazon.awssdk.services.sqs.model.*; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.function.Supplier; | |
public abstract class SqsQueueSpout extends BaseRichSpout { | |
private SpoutOutputCollector collector; | |
private SqsClient sqsClient; | |
private LinkedBlockingQueue<Message> queue; | |
private final String queueURL; | |
private final boolean reliable; | |
private final AwsCredentialsProvider awsCredentialsProvider; | |
private final Supplier<String> awsRegionSupplier; | |
private int sleepTimeInSeconds; | |
public SqsQueueSpout(String queueURL, boolean reliable, AwsCredentialsProvider awsCredentialsProvider, Supplier<String> awsRegionSupplier) { | |
this.queueURL = queueURL; | |
this.reliable = reliable; | |
this.awsCredentialsProvider = awsCredentialsProvider; | |
this.awsRegionSupplier = awsRegionSupplier; | |
this.sleepTimeInSeconds = 30; | |
} | |
@Override | |
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { | |
this.collector = collector; | |
this.queue = new LinkedBlockingQueue<>(); | |
this.sqsClient = SqsClient.builder(). | |
credentialsProvider(awsCredentialsProvider) | |
.region(Region.of(awsRegionSupplier.get())) | |
.build(); | |
} | |
@Override | |
public void nextTuple() { | |
this.pollSqsIfNeeded(); | |
Message message = this.queue.poll(); | |
if (message != null) { | |
String receiptHandle = message.receiptHandle(); | |
if(this.reliable) { | |
this.collector.emit(getStreamID(message), messageToStormTuple(message), receiptHandle); | |
} else { | |
// delete it right away | |
this.deleteMessage(receiptHandle); | |
this.collector.emit(getStreamID(message), messageToStormTuple(message)); | |
} | |
} | |
} | |
@Override | |
public void ack(Object receiptHandle) { | |
this.deleteMessage((String) receiptHandle); | |
} | |
@Override | |
public void fail(Object receiptHandle) { | |
this.changeMessageVisibility((String) receiptHandle); | |
} | |
public abstract List<Object> messageToStormTuple(Message message); | |
public String getStreamID(Message message) { | |
return Utils.DEFAULT_STREAM_ID; | |
} | |
private void changeMessageVisibility(String receiptHandle) { | |
ChangeMessageVisibilityRequest changeMessageVisibilityRequest = ChangeMessageVisibilityRequest.builder() | |
.queueUrl(this.queueURL) | |
.receiptHandle(receiptHandle) | |
.visibilityTimeout(0) | |
.build(); | |
this.sqsClient.changeMessageVisibility(changeMessageVisibilityRequest); | |
} | |
private void deleteMessage(String receiptHandle) { | |
DeleteMessageRequest deleteMessageRequest = DeleteMessageRequest.builder() | |
.queueUrl(this.queueURL) | |
.receiptHandle(receiptHandle) | |
.build(); | |
this.sqsClient.deleteMessage(deleteMessageRequest); | |
} | |
private void pollSqsIfNeeded() { | |
if(this.queue.isEmpty()) { | |
ReceiveMessageResponse receiveMessageResponse = this.pollSqsQueue(); | |
this.queue.addAll(receiveMessageResponse.messages()); | |
} | |
} | |
private ReceiveMessageResponse pollSqsQueue() { | |
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder() | |
.queueUrl(queueURL) | |
.waitTimeSeconds(sleepTimeInSeconds) | |
.build(); | |
return sqsClient.receiveMessage(receiveMessageRequest); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment