Last active
October 11, 2023 00:16
-
-
Save jivimberg/b0f4f94871c6f3e7d17fae1106c28047 to your computer and use it in GitHub Desktop.
SQS Consumer using Kotlin coroutines and pool of workers.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.jivimberg.sqs.published | |
import kotlinx.coroutines.CancellationException | |
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.isActive | |
import kotlinx.coroutines.yield | |
import java.lang.Thread.currentThread | |
suspend fun CoroutineScope.repeatUntilCancelled(block: suspend () -> Unit) { | |
while (isActive) { | |
try { | |
block() | |
yield() | |
} catch (ex: CancellationException) { | |
println("coroutine on ${currentThread().name} cancelled") | |
} catch (ex: Exception) { | |
println("${currentThread().name} failed with {$ex}. Retrying...") | |
ex.printStackTrace() | |
} | |
} | |
println("coroutine on ${currentThread().name} exiting") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.jivimberg.sqs.published | |
import com.jivimberg.sqs.SQS_URL | |
import kotlinx.coroutines.* | |
import kotlinx.coroutines.channels.Channel | |
import kotlinx.coroutines.channels.ReceiveChannel | |
import kotlinx.coroutines.channels.SendChannel | |
import kotlinx.coroutines.future.await | |
import software.amazon.awssdk.regions.Region | |
import software.amazon.awssdk.services.sqs.SqsAsyncClient | |
import software.amazon.awssdk.services.sqs.model.Message | |
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest | |
import kotlin.coroutines.CoroutineContext | |
class SqsSampleConsumerChannels( | |
private val sqs: SqsAsyncClient | |
) : CoroutineScope { | |
private val supervisorJob = SupervisorJob() | |
override val coroutineContext: CoroutineContext | |
get() = Dispatchers.IO + supervisorJob | |
fun start() = launch { | |
val messageChannel = Channel<Message>() | |
repeat(N_WORKERS) { launchWorker(messageChannel) } | |
launchMsgReceiver(messageChannel) | |
} | |
fun stop() { | |
supervisorJob.cancel() | |
} | |
private fun CoroutineScope.launchMsgReceiver(channel: SendChannel<Message>) = launch { | |
repeatUntilCancelled { | |
val receiveRequest = ReceiveMessageRequest.builder() | |
.queueUrl(SQS_URL) | |
.waitTimeSeconds(20) | |
.maxNumberOfMessages(10) | |
.build() | |
val messages = sqs.receiveMessage(receiveRequest).await().messages() | |
println("${Thread.currentThread().name} Retrieved ${messages.size} messages") | |
messages.forEach { | |
channel.send(it) | |
} | |
} | |
} | |
private fun CoroutineScope.launchWorker(channel: ReceiveChannel<Message>) = launch { | |
repeatUntilCancelled { | |
for (msg in channel) { | |
try { | |
processMsg(msg) | |
deleteMessage(msg) | |
} catch (ex: Exception) { | |
println("${Thread.currentThread().name} exception trying to process message ${msg.body()}") | |
ex.printStackTrace() | |
changeVisibility(msg) | |
} | |
} | |
} | |
} | |
private suspend fun processMsg(message: Message) { | |
println("${Thread.currentThread().name} Started processing message: ${message.body()}") | |
delay((1000L..2000L).random()) | |
println("${Thread.currentThread().name} Finished processing of message: ${message.body()}") | |
} | |
private suspend fun deleteMessage(message: Message) { | |
sqs.deleteMessage { req -> | |
req.queueUrl(SQS_URL) | |
req.receiptHandle(message.receiptHandle()) | |
}.await() | |
println("${Thread.currentThread().name} Message deleted: ${message.body()}") | |
} | |
private suspend fun changeVisibility(message: Message) { | |
sqs.changeMessageVisibility { req -> | |
req.queueUrl(SQS_URL) | |
req.receiptHandle(message.receiptHandle()) | |
req.visibilityTimeout(10) | |
}.await() | |
println("${Thread.currentThread().name} Changed visibility of message: ${message.body()}") | |
} | |
} | |
fun main() = runBlocking { | |
println("${Thread.currentThread().name} Starting program") | |
val sqs = SqsAsyncClient.builder() | |
.region(Region.US_EAST_1) | |
.build() | |
val consumer = SqsSampleConsumerChannels(sqs) | |
consumer.start() | |
delay(30000) | |
consumer.stop() | |
} | |
private const val N_WORKERS = 4 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.jivimberg.sqs.published | |
import kotlinx.coroutines.delay | |
import kotlinx.coroutines.runBlocking | |
import software.amazon.awssdk.regions.Region | |
import software.amazon.awssdk.services.sqs.SqsClient | |
import software.amazon.awssdk.services.sqs.model.SendMessageRequest | |
fun main() = runBlocking { | |
val sqs = SqsClient.builder() | |
.region(Region.US_EAST_1) | |
.build() | |
var id = 0 | |
while (true) { | |
id++ | |
val sendMsgRequest = SendMessageRequest.builder() | |
.queueUrl(SQS_URL) | |
.messageBody("hello world $id") | |
.build() | |
sqs.sendMessage(sendMsgRequest) | |
println("Message sent with id: $id") | |
delay((1000L..5000L).random()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment