Created
January 18, 2024 17:12
-
-
Save Chuckame/f08e33d9f45fb47f6f431ccf1872d755 to your computer and use it in GitHub Desktop.
kafka avro serializer & deserializer using jackson-avro-module, and compatible with the confluent avro serializers, useful when we don't want to generate classes using old plugins and only rely on java classes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.databind.ObjectReader; | |
import com.fasterxml.jackson.dataformat.avro.AvroMapper; | |
import com.fasterxml.jackson.dataformat.avro.AvroSchema; | |
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer; | |
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; | |
import lombok.SneakyThrows; | |
import org.apache.avro.Schema; | |
import org.apache.kafka.common.errors.SerializationException; | |
import org.apache.kafka.common.serialization.Deserializer; | |
import java.io.ByteArrayInputStream; | |
import java.nio.ByteBuffer; | |
import java.util.Map; | |
public class JacksonConfluentAvroDeserializer<T> extends AbstractKafkaAvroDeserializer implements Deserializer<T> { | |
private static final int MAGIC_BYTE = 0x0; | |
protected final ObjectReader avroReader; | |
protected final AvroSchema readerSchema; | |
protected boolean isKey; | |
@SneakyThrows | |
public JacksonConfluentAvroDeserializer(AvroMapper avroMapper, Class<T> serializedType) { | |
this.avroReader = avroMapper.readerFor(serializedType); | |
this.readerSchema = avroMapper.schemaFor(serializedType); | |
} | |
@Override | |
public void configure(Map<String, ?> configs, boolean isKey) { | |
super.configure(new KafkaAvroDeserializerConfig(configs)); | |
this.isKey = isKey; | |
} | |
@Override | |
@SneakyThrows | |
public T deserialize(String topic, byte[] data) { | |
if (data == null) return null; | |
final var input = new ByteArrayInputStream(data); | |
if (input.read() != MAGIC_BYTE) { | |
throw new SerializationException("Missing magic byte"); | |
} | |
String subject = getSubjectName(topic, isKey, data, new io.confluent.kafka.schemaregistry.avro.AvroSchema(readerSchema.getAvroSchema())); | |
final var writerSchemaId = getInt(data, 1); | |
Schema writerRawSchema = (Schema) schemaRegistry.getSchemaBySubjectAndId(subject, writerSchemaId).rawSchema(); | |
final var schema = new AvroSchema(writerRawSchema).withReaderSchema(readerSchema); | |
return avroReader.with(schema).readValue(input); | |
} | |
private int getInt(byte[] bytes, int offset) { | |
return ByteBuffer.wrap(bytes, offset, Integer.BYTES).getInt(); | |
} | |
@Override | |
@SneakyThrows | |
public void close() { | |
super.close(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.databind.ObjectWriter; | |
import com.fasterxml.jackson.dataformat.avro.AvroMapper; | |
import io.confluent.kafka.schemaregistry.ParsedSchema; | |
import io.confluent.kafka.schemaregistry.avro.AvroSchema; | |
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer; | |
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; | |
import lombok.SneakyThrows; | |
import org.apache.kafka.common.serialization.Serializer; | |
import java.io.ByteArrayOutputStream; | |
import java.nio.ByteBuffer; | |
import java.util.Map; | |
public class JacksonConfluentAvroSerializer<T> extends AbstractKafkaAvroSerializer implements Serializer<T> { | |
private static final int MAGIC_BYTE = 0x0; | |
protected final ObjectWriter avroWriter; | |
protected final ParsedSchema writerSchema; | |
protected boolean isKey; | |
@SneakyThrows | |
public JacksonConfluentAvroSerializer(AvroMapper avroMapper, Class<T> serializedType) { | |
var writerSchema = avroMapper.schemaFor(serializedType); | |
this.writerSchema = new AvroSchema(writerSchema.getAvroSchema()); | |
this.avroWriter = avroMapper.writerFor(serializedType).with(writerSchema); | |
} | |
@Override | |
public void configure(Map<String, ?> configs, boolean isKey) { | |
super.configure(new KafkaAvroSerializerConfig(configs)); | |
this.isKey = isKey; | |
} | |
@Override | |
@SneakyThrows | |
public byte[] serialize(String topic, T data) { | |
var id = retrieveSchemaId(topic, data); | |
ByteArrayOutputStream out = new ByteArrayOutputStream(); | |
out.write(MAGIC_BYTE); | |
out.write(getIntBytes(id)); | |
avroWriter.writeValue(out, data); | |
return out.toByteArray(); | |
} | |
private static byte[] getIntBytes(int id) { | |
return ByteBuffer.allocate(Integer.BYTES).putInt(id).array(); | |
} | |
@SneakyThrows | |
protected int retrieveSchemaId(String topic, T data) { | |
String subject = getSubjectName(topic, isKey, data, writerSchema); | |
if (autoRegisterSchema) { | |
return register(subject, writerSchema, normalizeSchema); | |
} else { | |
return schemaRegistry.getId(subject, writerSchema); | |
} | |
} | |
@Override | |
@SneakyThrows | |
public void close() { | |
super.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment