Skip to content

Instantly share code, notes, and snippets.

@litvil
Created June 8, 2018 16:59
Show Gist options
  • Save litvil/5218f59cfedfcdb43b764eacf5dbce0a to your computer and use it in GitHub Desktop.
Save litvil/5218f59cfedfcdb43b764eacf5dbce0a to your computer and use it in GitHub Desktop.
Camel Kafka Avro serializer and deserializer
package com.company.mapping;
import io.confluent.common.config.ConfigException;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class CamelKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer
implements Deserializer<Object> {
@Override
protected void configure(KafkaAvroDeserializerConfig config) {
if (SCHEMA_REGISTRY_URL == null) {
throw new org.apache.kafka.common.config.ConfigException("No schema registry provided");
}
try {
final List<String> schemas = Collections.singletonList("http://localhost:8081");
this.schemaRegistry = new CachedSchemaRegistryClient(schemas, Integer.MAX_VALUE);
this.useSpecificAvroReader = true;
} catch (ConfigException e) {
throw new org.apache.kafka.common.config.ConfigException(e.getMessage());
}
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(null);
}
@Override
public Object deserialize(String s, byte[] bytes) {
return deserialize(bytes);
}
@Override
public void close() {
}
}
package com.company.plugins;
import io.confluent.common.config.ConfigException;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class CamelKafkaAvroSerializer extends
AbstractKafkaAvroSerializer implements Serializer<Object> {
private static final List<String> SCHEMA_REGISTRY_URL =
Collections.singletonList("http://localhost:8081");
@Override
protected void configure(KafkaAvroSerializerConfig config) {
try {
this.schemaRegistry = new CachedSchemaRegistryClient(SCHEMA_REGISTRY_URL, Integer.MAX_VALUE);
} catch (ConfigException e) {
throw new org.apache.kafka.common.config.ConfigException(e.getMessage());
}
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(null);
}
@Override
public byte[] serialize(final String s, final Object o) {
return super.serializeImpl(s, o);
}
@Override
public void close() {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment