Created
June 8, 2024 17:37
-
-
Save calvinlfer/7392b0869e6f9d0ceda0950f2926fd0a to your computer and use it in GitHub Desktop.
FS2 Kafka Avro4S support (Schema Registry aware)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.sksamuel.avro4s.{Decoder, SchemaFor} | |
import fs2.kafka.{Deserializer, ValueDeserializer, KeyDeserializer} | |
import fs2.kafka.vulcan.AvroSettings | |
import cats.effect.{Sync, Resource} | |
import java.nio.ByteBuffer | |
import io.confluent.kafka.schemaregistry.avro.AvroSchema | |
final class Avro4sDeserializer[A >: Null]( | |
private val decoder: Decoder[A], | |
private val schemaFor: SchemaFor[A] | |
): | |
def forKey[F[_]: Sync](settings: AvroSettings[F]): Resource[F, KeyDeserializer[F, A]] = | |
create(isKey = true, settings) | |
def forValue[F[_]: Sync](settings: AvroSettings[F]): Resource[F, ValueDeserializer[F, A]] = | |
create(isKey = false, settings) | |
private def create[F[_]](isKey: Boolean, settings: AvroSettings[F])(using sync: Sync[F]) = | |
val readerSchema = schemaFor.schema | |
val avroDecoder = decoder.decode(readerSchema) | |
Resource | |
.make(settings.createAvroDeserializer(isKey))((deserializer, _) => sync.delay(deserializer.close())) | |
.map: (underlyingDeserializer, schemaRegistryClient) => | |
Deserializer.instance: (topic, _headers, rawBytes) => | |
if rawBytes == null || rawBytes.length == 0 then | |
sync.raiseError(new IllegalArgumentException(s"Invalid Avro record: bytes is null or empty")) | |
else | |
val writerSchemaId = ByteBuffer.wrap(rawBytes).getInt(1) // skip magic byte | |
val writerSchema = schemaRegistryClient.getSchemaById(writerSchemaId) match | |
case avroSchema: AvroSchema => avroSchema.rawSchema() | |
case _ => null | |
sync.delay: | |
// deserialize underlying with the writer schema (GenericRecord) | |
val underlyingGenericRecord = underlyingDeserializer.deserialize(topic, rawBytes, writerSchema) | |
// decode the high-level with the reader schema | |
avroDecoder(underlyingGenericRecord) | |
object Avro4sDeserializer: | |
def deserializer[A >: Null](using decoder: Decoder[A], schemaFor: SchemaFor[A]): Avro4sDeserializer[A] = | |
Avro4sDeserializer(decoder, schemaFor) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.sksamuel.avro4s.{Encoder, SchemaFor} | |
import fs2.kafka.{Serializer, KeySerializer, ValueSerializer} | |
import fs2.kafka.vulcan.AvroSettings | |
import cats.effect.kernel.Resource | |
import cats.effect.Sync | |
final class Avro4sSerializer[A >: Null]( | |
private val encoder: Encoder[A], | |
private val schemaFor: SchemaFor[A] | |
): | |
def forKey[F[_]](settings: AvroSettings[F])(using Sync[F]): Resource[F, KeySerializer[F, A]] = | |
create(isKey = true, settings) | |
def forValue[F[_]: Sync](settings: AvroSettings[F]): Resource[F, ValueSerializer[F, A]] = | |
create(isKey = false, settings) | |
private def create[F[_]](isKey: Boolean, settings: AvroSettings[F])(using sync: Sync[F]): Resource[F, Serializer[F, A]] = | |
val writerSchema = schemaFor.schema | |
val avroEncoder = encoder.encode(writerSchema) | |
Resource | |
.make(settings.createAvroSerializer(isKey, Option(writerSchema)))((ser, _) => sync.delay(ser.close())) | |
.map: (underlyingAvroSerializer, _) => | |
Serializer.instance: (topic, headers, value) => | |
sync.delay: | |
val encodedAvro = avroEncoder(value) | |
underlyingAvroSerializer.serialize(topic, encodedAvro) | |
object Avro4sSerializer: | |
def serializer[A >: Null](using encoder: Encoder[A], schemaFor: SchemaFor[A]): Avro4sSerializer[A] = | |
Avro4sSerializer[A](encoder, schemaFor) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Here's an improved version which makes it more apparent during derserialization that we are going from:
Array[Byte]
->GenericRecord
->A
Where
Array[Bytes]
->GenericRecord
is using the writer schema obtained from the schema registryAnd
GenericRecord
->A
is done using the reader schema defined by Avro4SOne further improvement is to swap the avro4s
Decoder[A]
forRecordFormat[A] or ToRecord[A]/FromRecord[A]
which is more explicit about going to and fromGenericRecord
/A
as opposed toDecoder[A]
which just saysSchema => Any => A