Custom serde for windowed records. Why ? http://mail-archives.apache.org/mod_mbox/kafka-users/201701.mbox/%3cCABQKjk+NtexSyt3gB0hA8w0kYqSkwxWkVUpvSXH9h8uJD_SLZA@mail.gmail.com%3e.
Created
January 16, 2017 18:41
-
-
Save nfo/eaf350afb5667a3516593da4d48e757a to your computer and use it in GitHub Desktop.
Kafka Streams - A serde for timed windows (start + end)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TimeWindowedDeserializer<T> implements Deserializer<Windowed<T>> { | |
private static final int TIMESTAMP_SIZE = 8; | |
private Deserializer<T> inner; | |
public TimeWindowedDeserializer(Deserializer<T> inner) { | |
this.inner = inner; | |
} | |
@Override | |
public void configure(Map<String, ?> configs, boolean isKey) { | |
// do nothing | |
} | |
@Override | |
public Windowed<T> deserialize(String topic, byte[] data) { | |
// Read the inner data | |
byte[] bytes = new byte[data.length - TIMESTAMP_SIZE - TIMESTAMP_SIZE]; | |
System.arraycopy(data, 0, bytes, 0, bytes.length); | |
// Read the start timestamp | |
byte[] startBytes = new byte[TIMESTAMP_SIZE]; | |
System.arraycopy(data, data.length - TIMESTAMP_SIZE - TIMESTAMP_SIZE, startBytes, 0, startBytes.length); | |
long start = ByteBuffer.wrap(startBytes).getLong(0); | |
// Read the end timestamp | |
byte[] endBytes = new byte[TIMESTAMP_SIZE]; | |
System.arraycopy(data, data.length - TIMESTAMP_SIZE, endBytes, 0, endBytes.length); | |
long end = ByteBuffer.wrap(endBytes).getLong(0); | |
// Read as a `TimeWindow`. | |
// An `UnlimitedWindow` is just a window with an end equal to `Long.MAX_VALUE`. | |
// And consumer code should should only use the superclass `Window`. | |
return new Windowed<T>(inner.deserialize(topic, bytes), new TimeWindow(start, end)); | |
} | |
@Override | |
public void close() { | |
inner.close(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Serde for instances of `Window` subclasses, as long as they do not contain other instance variables that `start` and `end`. | |
* It always deserializes as a `TimeWindow`. An `UnlimitedWindow` is a window with `end` = Long.MAX_VALUE. | |
* | |
* We do not use {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} because it does not serialize | |
* the window `end`, only the window `start`. | |
* | |
* @param <T> | |
*/ | |
public class TimeWindowedSerde<T> implements Serde<Windowed<T>> { | |
private final Serde<Windowed<T>> inner; | |
public TimeWindowedSerde(Serde<T> serde) { | |
inner = Serdes.serdeFrom( | |
new TimeWindowedSerializer<>(serde.serializer()), | |
new TimeWindowedDeserializer<>(serde.deserializer())); | |
} | |
@Override | |
public Serializer<Windowed<T>> serializer() { | |
return inner.serializer(); | |
} | |
@Override | |
public Deserializer<Windowed<T>> deserializer() { | |
return inner.deserializer(); | |
} | |
@Override | |
public void configure(Map<String, ?> configs, boolean isKey) { | |
inner.serializer().configure(configs, isKey); | |
inner.deserializer().configure(configs, isKey); | |
} | |
@Override | |
public void close() { | |
inner.serializer().close(); | |
inner.deserializer().close(); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TimeWindowedSerializer<T> implements Serializer<Windowed<T>> { | |
private static final int TIMESTAMP_SIZE = 8; | |
private Serializer<T> inner; | |
public TimeWindowedSerializer(Serializer<T> inner) { | |
this.inner = inner; | |
} | |
@Override | |
public void configure(Map<String, ?> configs, boolean isKey) { | |
// do nothing | |
} | |
@Override | |
public byte[] serialize(String topic, Windowed<T> data) { | |
byte[] serializedKey = inner.serialize(topic, data.key()); | |
ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + TIMESTAMP_SIZE); | |
buf.put(serializedKey); | |
buf.putLong(data.window().start()); | |
buf.putLong(data.window().end()); | |
return buf.array(); | |
} | |
@Override | |
public void close() { | |
inner.close(); | |
} | |
public byte[] serializeBaseKey(String topic, Windowed<T> data) { | |
return inner.serialize(topic, data.key()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I'm having trouble compiling this due to the TimeWindow class (line 36 of Deserializer). What class is it from?