Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Created June 8, 2024 17:37
Show Gist options
  • Save calvinlfer/7392b0869e6f9d0ceda0950f2926fd0a to your computer and use it in GitHub Desktop.
Save calvinlfer/7392b0869e6f9d0ceda0950f2926fd0a to your computer and use it in GitHub Desktop.
FS2 Kafka Avro4S support (Schema Registry aware)
import com.sksamuel.avro4s.{Decoder, SchemaFor}
import fs2.kafka.{Deserializer, ValueDeserializer, KeyDeserializer}
import fs2.kafka.vulcan.AvroSettings
import cats.effect.{Sync, Resource}
import java.nio.ByteBuffer
import io.confluent.kafka.schemaregistry.avro.AvroSchema
final class Avro4sDeserializer[A >: Null](
private val decoder: Decoder[A],
private val schemaFor: SchemaFor[A]
):
def forKey[F[_]: Sync](settings: AvroSettings[F]): Resource[F, KeyDeserializer[F, A]] =
create(isKey = true, settings)
def forValue[F[_]: Sync](settings: AvroSettings[F]): Resource[F, ValueDeserializer[F, A]] =
create(isKey = false, settings)
private def create[F[_]](isKey: Boolean, settings: AvroSettings[F])(using sync: Sync[F]) =
val readerSchema = schemaFor.schema
val avroDecoder = decoder.decode(readerSchema)
Resource
.make(settings.createAvroDeserializer(isKey))((deserializer, _) => sync.delay(deserializer.close()))
.map: (underlyingDeserializer, schemaRegistryClient) =>
Deserializer.instance: (topic, _headers, rawBytes) =>
if rawBytes == null || rawBytes.length == 0 then
sync.raiseError(new IllegalArgumentException(s"Invalid Avro record: bytes is null or empty"))
else
val writerSchemaId = ByteBuffer.wrap(rawBytes).getInt(1) // skip magic byte
val writerSchema = schemaRegistryClient.getSchemaById(writerSchemaId) match
case avroSchema: AvroSchema => avroSchema.rawSchema()
case _ => null
sync.delay:
// deserialize underlying with the writer schema (GenericRecord)
val underlyingGenericRecord = underlyingDeserializer.deserialize(topic, rawBytes, writerSchema)
// decode the high-level with the reader schema
avroDecoder(underlyingGenericRecord)
object Avro4sDeserializer:
def deserializer[A >: Null](using decoder: Decoder[A], schemaFor: SchemaFor[A]): Avro4sDeserializer[A] =
Avro4sDeserializer(decoder, schemaFor)
import com.sksamuel.avro4s.{Encoder, SchemaFor}
import fs2.kafka.{Serializer, KeySerializer, ValueSerializer}
import fs2.kafka.vulcan.AvroSettings
import cats.effect.kernel.Resource
import cats.effect.Sync
final class Avro4sSerializer[A >: Null](
private val encoder: Encoder[A],
private val schemaFor: SchemaFor[A]
):
def forKey[F[_]](settings: AvroSettings[F])(using Sync[F]): Resource[F, KeySerializer[F, A]] =
create(isKey = true, settings)
def forValue[F[_]: Sync](settings: AvroSettings[F]): Resource[F, ValueSerializer[F, A]] =
create(isKey = false, settings)
private def create[F[_]](isKey: Boolean, settings: AvroSettings[F])(using sync: Sync[F]): Resource[F, Serializer[F, A]] =
val writerSchema = schemaFor.schema
val avroEncoder = encoder.encode(writerSchema)
Resource
.make(settings.createAvroSerializer(isKey, Option(writerSchema)))((ser, _) => sync.delay(ser.close()))
.map: (underlyingAvroSerializer, _) =>
Serializer.instance: (topic, headers, value) =>
sync.delay:
val encodedAvro = avroEncoder(value)
underlyingAvroSerializer.serialize(topic, encodedAvro)
object Avro4sSerializer:
def serializer[A >: Null](using encoder: Encoder[A], schemaFor: SchemaFor[A]): Avro4sSerializer[A] =
Avro4sSerializer[A](encoder, schemaFor)
@calvinlfer
Copy link
Author

calvinlfer commented Jun 8, 2024

Here's an improved version which makes it more apparent during derserialization that we are going from:
Array[Byte] -> GenericRecord -> A

Where Array[Bytes] -> GenericRecord is using the writer schema obtained from the schema registry
And GenericRecord -> A is done using the reader schema defined by Avro4S

package com.digital.growth.optimove.event.normalizer.bet_settlement.adapters.in.kafka

import cats.syntax.functor.*
import com.sksamuel.avro4s.{Decoder, SchemaFor}
import fs2.kafka.{Deserializer, ValueDeserializer, KeyDeserializer}
import fs2.kafka.vulcan.AvroSettings
import cats.effect.{Sync, Resource}
import java.nio.ByteBuffer
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.errors.SerializationException

final class Avro4sDeserializer[A >: Null](
  private val decoder: Decoder[A],
  private val schemaFor: SchemaFor[A]
):
  def forKey[F[_]: Sync](settings: AvroSettings[F]): Resource[F, KeyDeserializer[F, A]] =
    create(isKey = true, settings)

  def forValue[F[_]: Sync](settings: AvroSettings[F]): Resource[F, ValueDeserializer[F, A]] =
    create(isKey = false, settings)

  private def create[F[_]](isKey: Boolean, settings: AvroSettings[F])(using sync: Sync[F]) =
    val readerSchema = schemaFor.schema
    val avroDecoder  = decoder.decode(readerSchema)
    Resource
      .make(settings.createAvroDeserializer(isKey))((deserializer, _) => sync.delay(deserializer.close()))
      .map: (underlyingDeserializer, schemaRegistryClient) =>
        Deserializer.instance: (topic, _headers, rawBytes) =>
          if rawBytes == null || rawBytes.length == 0 then
            sync.raiseError(new IllegalArgumentException(s"Invalid Avro record: bytes is null or empty"))
          else
            val writerSchemaId = ByteBuffer.wrap(rawBytes).getInt(1) // skip magic byte
            val writerSchema = schemaRegistryClient.getSchemaById(writerSchemaId) match
              case avroSchema: AvroSchema => avroSchema.rawSchema()
              case _                      => null
            sync.defer:
              // deserialize underlying with the writer schema (GenericRecord)
              val underlyingGenericRecord =
                underlyingDeserializer.deserialize(topic, rawBytes, writerSchema) match
                  case record: GenericRecord => sync.pure(record)
                  case other =>
                    sync.raiseError(
                      new SerializationException(
                        s"Unable to deserialize: ${other.getClass.getName} is no Avro GenericRecord, value is '$other'"
                      )
                    )

              // decode the high-level with the reader schema
              underlyingGenericRecord.map(avroDecoder)
object Avro4sDeserializer:
  def deserializer[A >: Null](using decoder: Decoder[A], schemaFor: SchemaFor[A]): Avro4sDeserializer[A] =
    Avro4sDeserializer(decoder, schemaFor)

One further improvement is to swap the avro4s Decoder[A] for RecordFormat[A] or ToRecord[A]/FromRecord[A] which is more explicit about going to and from GenericRecord/A as opposed to Decoder[A] which just says Schema => Any => A

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment