Skip to content

Instantly share code, notes, and snippets.

import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
import org.apache.kafka.common.serialization.Serializer
import java.util.HashMap
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroSerializer
class StringAvroSerializer<T> : Serializer<T> {
import org.apache.kafka.common.serialization.Deserializer
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.KafkaAvroDeserializer
class StringAvroDeserializer : Deserializer<String> {
private val inner: KafkaAvroDeserializer
@vikas-tikoo-zefr
vikas-tikoo-zefr / ConvertKConnectTimestampField.java
Last active January 4, 2018 19:31
Update the record's specified field with the connect timestamp time to an optional INT64/long type
package com.zefr.connect.transforms;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
@vikas-tikoo-zefr
vikas-tikoo-zefr / Namespacefy.java
Created December 13, 2017 18:46
Add namespace to connect schema
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SchemaUtil;
import org.apache.kafka.connect.transforms.util.SimpleConfig;