Last active
June 16, 2020 07:50
-
-
Save romanskie/8833755de7ade22599e1c748c96fad11 to your computer and use it in GitHub Desktop.
A sample Kafka JSON SerDe written in Scala by using the Circe JSON library.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.ByteBuffer | |
import java.util | |
import io.circe.parser._ | |
import io.circe.syntax._ | |
import io.circe.{Decoder, Encoder, _} | |
import org.apache.kafka.common.errors.SerializationException | |
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer} | |
import scala.util.Try | |
final class KafkaJsonSerde[T >: Null: Encoder: Decoder] extends Serde[T] with SerdeDefaults { | |
override val serializer: Serializer[T] = new Serializer[T] with SerdeDefaults { | |
override def serialize(topic: String, t: T): Array[Byte] = | |
Option(t).map { someT => | |
Try { | |
val byteBuffer: ByteBuffer = Printer.noSpaces.prettyByteBuffer(someT.asJson) | |
val size: Int = byteBuffer.remaining | |
val bytesArray: Array[Byte] = new Array[Byte](size) | |
byteBuffer.get(bytesArray, 0, bytesArray.length) | |
bytesArray | |
}.fold( | |
ex => throw new SerializationException("Error serializing JSON message", ex), | |
someBytes => someBytes | |
) | |
}.orNull | |
} | |
override val deserializer: Deserializer[T] = new Deserializer[T] with SerdeDefaults { | |
override def deserialize(topic: String, bytes: Array[Byte]): T = | |
Option(bytes).map { someBytes => | |
val s = new String(someBytes) | |
decode[T](s).toTry | |
.fold( | |
ex => throw new SerializationException("Error deserializing JSON message", ex), | |
someT => someT | |
) | |
}.orNull | |
} | |
} | |
object KafkaJsonSerde { | |
def apply[T >: Null: Encoder: Decoder](): KafkaJsonSerde[T] = new KafkaJsonSerde[T]() | |
} | |
trait SerdeDefaults { | |
def configure(configs: util.Map[String, _], isKey: Boolean): Unit = () | |
def close(): Unit = () | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment