Created
November 22, 2021 14:26
-
-
Save Daenyth/e6118cca08ac56e8382b535d1d25676c to your computer and use it in GitHub Desktop.
fs2-kafka vulcan => avro Serde helper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.Sync | |
import cats.syntax.all._ | |
import cats.effect.std.Dispatcher | |
import fs2.kafka.vulcan.{AvroSettings, avroDeserializer, avroSerializer} | |
import fs2.kafka.{Deserializer, Headers, Serializer} | |
import org.apache.kafka.common.serialization.{Serde, Serdes} | |
import vulcan.Codec | |
// Credit to Fede Fernández for the original CE2 version | |
// Note: this isn't under an open source license; it's for educational purposes only | |
object VulcanHelper { | |
def serde[F[_], A]( | |
serializer: Serializer[F, A], | |
deserializer: Deserializer[F, A], | |
disp: Dispatcher[F] | |
): Serde[A] = | |
Serdes.serdeFrom( | |
new org.apache.kafka.common.serialization.Serializer[A] { | |
override def serialize(topic: String, data: A): Array[Byte] = | |
if (data == null) Array.empty | |
else | |
disp.unsafeRunSync(serializer.serialize(topic, Headers.empty, data)) | |
}, | |
new org.apache.kafka.common.serialization.Deserializer[A] { | |
override def deserialize(topic: String, data: Array[Byte]): A = | |
if (data == null || data.isEmpty) null.asInstanceOf[A] | |
else | |
disp.unsafeRunSync(deserializer.deserialize(topic, Headers.empty, data)) | |
} | |
) | |
def keySerde[F[_]: Sync, A: Codec]( | |
avroSettings: AvroSettings[F], | |
disp: Dispatcher[F] | |
): F[Serde[A]] = | |
( | |
avroSerializer[A].using(avroSettings).forKey, | |
avroDeserializer[A].using(avroSettings).forKey | |
).mapN(serde[F, A](_, _, disp)) | |
def valueSerde[F[_]: Sync, A: Codec]( | |
avroSettings: AvroSettings[F], | |
disp: Dispatcher[F] | |
): F[Serde[A]] = | |
( | |
avroSerializer[A].using(avroSettings).forValue, | |
avroDeserializer[A].using(avroSettings).forValue | |
).mapN(serde[F, A](_, _, disp)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment