Skip to content

Instantly share code, notes, and snippets.

View geomagilles's full-sized avatar

Gilles Barbier geomagilles

View GitHub Profile
@geomagilles
geomagilles / kotlinPulsarConsumerProducer.kt
Last active December 14, 2020 21:35
Building Pulsar Producer<T> and Consumer<T> in kotlin using native serialization
import com.github.avrokotlin.avro4k.Avro
import com.github.avrokotlin.avro4k.io.AvroEncodeFormat
import io.infinitic.common.tasks.executors.messages.RunTask
import kotlinx.serialization.KSerializer
import org.apache.avro.file.SeekableByteArrayInput
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.Producer
@geomagilles
geomagilles / TaskEngineMessage.kt
Created December 15, 2020 10:14
TaskEngineMessages
@Serializable
sealed class TaskEngineMessage() {
abstract val taskId: TaskId
}
@Serializable
data class DispatchTask(
override val taskId: TaskId,
val taskName: TaskName,
val methodName: MethodName,
@geomagilles
geomagilles / TaskEngineEnvelope.kt
Last active December 15, 2020 10:25
TaskEngineEnvelope
@Serializable
data class TaskEngineEnvelope(
val taskId: TaskId,
val type: TaskEngineMessageType,
val dispatchTask: DispatchTask? = null,
val cancelTask: CancelTask? = null,
val taskCanceled: TaskCanceled? = null,
val taskCompleted: TaskCompleted? = null,
) {
init {
@geomagilles
geomagilles / TaskEngineWorker.kt
Last active December 15, 2020 11:19
Building Workers With Coroutines
typealias TaskEngineMessageToProcess = MessageToProcess<TaskEngineMessage>
fun CoroutineScope.startPulsarTaskEngineWorker(
taskEngineConsumer: Consumer<TaskEngineEnvelope>,
taskEngine: TaskEngine,
logChannel: SendChannel<TaskEngineMessageToProcess>?,
enginesNumber: Int
) = launch(Dispatchers.IO) {
val taskInputChannel = Channel<TaskEngineMessageToProcess>()