Last active
December 15, 2020 11:19
-
-
Save geomagilles/141306a13ebe44053bdb307b3de59aa7 to your computer and use it in GitHub Desktop.
Building Workers With Coroutines
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
typealias TaskEngineMessageToProcess = MessageToProcess<TaskEngineMessage> | |
fun CoroutineScope.startPulsarTaskEngineWorker( | |
taskEngineConsumer: Consumer<TaskEngineEnvelope>, | |
taskEngine: TaskEngine, | |
logChannel: SendChannel<TaskEngineMessageToProcess>?, | |
enginesNumber: Int | |
) = launch(Dispatchers.IO) { | |
val taskInputChannel = Channel<TaskEngineMessageToProcess>() | |
val taskResultsChannel = Channel<TaskEngineMessageToProcess>() | |
// coroutine dedicated to pulsar message pulling | |
launch(CoroutineName("task-engine-message-puller")) { | |
while (isActive) { | |
val message: Message<TaskEngineEnvelope> = taskEngineConsumer.receiveAsync().await() | |
try { | |
val envelope = readBinary(message.data, TaskEngineEnvelope.serializer()) | |
taskInputChannel.send(MessageToProcess(envelope.message(), message.messageId)) | |
} catch (e: Exception) { | |
taskEngineConsumer.negativeAcknowledge(message.messageId) | |
throw e | |
} | |
} | |
} | |
// coroutines dedicated to Task Engine | |
repeat(enginesNumber) { | |
launch(CoroutineName("task-engine-$it")) { | |
for (messageToProcess in taskInputChannel) { | |
try { | |
messageToProcess.output = taskEngine.handle(messageToProcess.message) | |
} catch (e: Exception) { | |
messageToProcess.exception = e | |
} | |
taskResultsChannel.send(messageToProcess) | |
} | |
} | |
} | |
// coroutine dedicated to pulsar message acknowledging | |
launch(CoroutineName("task-engine-message-acknowledger")) { | |
for (messageToProcess in taskResultsChannel) { | |
if (messageToProcess.exception == null) { | |
taskEngineConsumer.acknowledgeAsync(messageToProcess.messageId).await() | |
} else { | |
taskEngineConsumer.negativeAcknowledge(messageToProcess.messageId) | |
} | |
logChannel?.send(messageToProcess) | |
} | |
} | |
} | |
data class MessageToProcess<T> ( | |
val message: T, | |
val messageId: MessageId, | |
var exception: Exception? = null, | |
var output: Any? = null | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment