Last active
December 14, 2020 21:35
-
-
Save geomagilles/7415047bd224102334f56da88bce3d2b to your computer and use it in GitHub Desktop.
Building Pulsar Producer<T> and Consumer<T> in kotlin using native serialization
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.github.avrokotlin.avro4k.Avro | |
import com.github.avrokotlin.avro4k.io.AvroEncodeFormat | |
import io.infinitic.common.tasks.executors.messages.RunTask | |
import kotlinx.serialization.KSerializer | |
import org.apache.avro.file.SeekableByteArrayInput | |
import org.apache.avro.generic.GenericDatumReader | |
import org.apache.avro.generic.GenericRecord | |
import org.apache.avro.io.DecoderFactory | |
import org.apache.pulsar.client.api.Consumer | |
import org.apache.pulsar.client.api.Producer | |
import org.apache.pulsar.client.api.PulsarClient | |
import org.apache.pulsar.client.api.Schema | |
import org.apache.pulsar.client.api.schema.SchemaDefinition | |
import org.apache.pulsar.client.api.schema.SchemaReader | |
import org.apache.pulsar.client.api.schema.SchemaWriter | |
import java.io.ByteArrayOutputStream | |
import java.io.InputStream | |
// Convert T instance to Avro schemaless binary format | |
fun <T : Any> writeBinary(t: T, serializer: KSerializer<T>): ByteArray { | |
val out = ByteArrayOutputStream() | |
Avro.default.openOutputStream(serializer) { | |
encodeFormat = AvroEncodeFormat.Binary | |
schema = Avro.default.schema(serializer) | |
}.to(out).write(t).close() | |
return out.toByteArray() | |
} | |
// Convert Avro schemaless byte array to T instance | |
fun <T> readBinary(bytes: ByteArray, serializer: KSerializer<T>): T { | |
val datumReader = GenericDatumReader<GenericRecord>(Avro.default.schema(serializer)) | |
val decoder = DecoderFactory.get().binaryDecoder(SeekableByteArrayInput(bytes), null) | |
return Avro.default.fromRecord(serializer, datumReader.read(null, decoder)) | |
} | |
// custom Pulsar SchemaReader | |
class RunTaskSchemaReader: SchemaReader<RunTask> { | |
override fun read(bytes: ByteArray, offset: Int, length: Int) = | |
read(bytes.inputStream(offset, length)) | |
override fun read(inputStream: InputStream) = | |
readBinary(inputStream.readBytes(), RunTask.serializer()) | |
} | |
// custom Pulsar SchemaWriter | |
class RunTaskSchemaWriter : SchemaWriter<RunTask> { | |
override fun write(message: RunTask) = writeBinary(message, RunTask.serializer()) | |
} | |
// custom Pulsar SchemaDefinition<RunTask> | |
fun runTaskSchemaDefinition(): SchemaDefinition<RunTask> = | |
SchemaDefinition.builder<RunTask>() | |
.withJsonDef(Avro.default.schema(RunTask.serializer()).toString()) | |
.withSchemaReader(RunTaskSchemaReader()) | |
.withSchemaWriter(RunTaskSchemaWriter()) | |
.withSupportSchemaVersioning(true) | |
.build() | |
// Create an instance of Producer<RunTask> | |
fun runTaskProducer(client: PulsarClient): Producer<RunTask> = client | |
.newProducer(Schema.AVRO(runTaskSchemaDefinition())) | |
.topic("some-avro-topic") | |
.create(); | |
// Create an instance of Consumer<RunTask> | |
fun runTaskConsumer(client: PulsarClient): Consumer<RunTask> = client | |
.newConsumer(Schema.AVRO(runTaskSchemaDefinition())) | |
.topic("some-avro-topic") | |
.subscribe(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment