Last active
June 9, 2023 12:06
-
-
Save ErunamoJAZZ/ce0e31a98ed9240e2efaa9a3135684bb to your computer and use it in GitHub Desktop.
play-json JsValue, Kafka Serializer and Deserializer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.common.serialization.Deserializer | |
import play.api.libs.json.Json | |
class JsValueDeserializer extends Deserializer[JsValue] { | |
private val encoding = "UTF8" | |
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = { | |
// nothing to do | |
} | |
override def deserialize(topic: String, data: Array[Byte]): JsValue = { | |
val opData: Option[Array[Byte]] = Option(data) | |
try { | |
opData.map(new String(_, encoding)).map(Json.parse).orNull | |
} catch { | |
case e: UnsupportedEncodingException => | |
throw new SerializationException("Error when deserializing Array[Byte] to (string) JsValue due to unsupported encoding " + encoding); | |
} | |
} | |
override def close(): Unit = { | |
// nothing to do | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Example based in http://niels.nu/blog/2016/kafka-custom-serializers.html | |
import org.apache.kafka.common.serialization.Serializer | |
import org.apache.kafka.common.errors.SerializationException | |
import java.io.UnsupportedEncodingException | |
import play.api.libs.json.JsValue | |
class JsValueSerializer extends Serializer[JsValue] { | |
private val encoding = "UTF8" | |
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = { | |
// nothing to do | |
} | |
override def serialize(topic: String, data: JsValue): Array[Byte] = { | |
val opData: Option[JsValue] = Option(data) | |
try { | |
opData.map(_.toString.getBytes(encoding)).orNull | |
} catch { | |
case e: UnsupportedEncodingException => | |
throw new SerializationException("Error when serializing JsValue (toString) to Array[Byte] due to unsupported encoding " + encoding) | |
} | |
} | |
override def close(): Unit = { | |
// nothing to do | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Pls, if you are doning a Kafka Serializer, use Chill and Chill-bijection for this!!!
https://github.com/twitter/chill/#chill-bijection