Last active
January 21, 2019 12:11
-
-
Save kbastani/ec39907a43a77b88b4c65775d9f1f250 to your computer and use it in GitHub Desktop.
Simpler KStream API Example: See KafkaController.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Aggregate<T> implements Aggregator<Long, KafkaEvent, KafkaEvent> { | |
private final Class<T> clazz; | |
private final Aggregator<Long, T, T> aggregation; | |
public Aggregate(Class<T> clazz, Aggregator<Long, T, T> aggregation) { | |
this.clazz = clazz; | |
this.aggregation = aggregation; | |
} | |
@Override | |
public KafkaEvent apply(Long aggKey, KafkaEvent value, KafkaEvent aggregate) { | |
return KafkaEvent.withPayload(aggregation.apply(aggKey, value.to(clazz), aggregate.to(clazz))); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Filter<T> implements Predicate<Long, KafkaEvent> { | |
private final Class<T> clazz; | |
private final Function<T, Boolean> condition; | |
public Filter(Class<T> clazz, Function<T, Boolean> condition) { | |
this.clazz = clazz; | |
this.condition = condition; | |
} | |
@Override | |
public boolean test(Long key, KafkaEvent value) { | |
return condition.apply(value.to(clazz)); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Component | |
public class KafkaController { | |
@Configuration | |
@EnableKafka | |
@EnableKafkaStreams | |
public static class KafkaStreamsConfiguration { | |
//... Kafka Streams Producer/Consumer Initialization ... | |
/** | |
* A simplified example of abstract command/pipe operators that wrap around KStreams API. The KafkaEvent | |
* class is a serializable container that standardizes the input and output of data types on KTables | |
* and KStreams. The StreamCommands<T> class provides wrapped pipeline operators that automatically | |
* deserializes and serializes the input and output of serialized entities inside the KafkaEvent | |
* container. The KafkaEventSerde is the standard input and output serde that embeds any serializable | |
* object. The entire goal here is to abstract away serialization and to make the pipe operations | |
* on KafkaStreams more readable. | |
*/ | |
@Bean | |
public KStream<Long, KafkaEvent> movieStream(KStreamBuilder builder) { | |
KStream<Long, KafkaEvent> pipe = builder.stream(Serdes.Long(), new KafkaEventSerde(), "movie-stream"); | |
// Wraps the functional KStreams internals and provides generic type inference and serialization | |
StreamCommands<DomainEvent> commands = new StreamCommands<>(DomainEvent.class); | |
// Imagine the possibilities... | |
pipe.filter(commands.filter(event -> event.getType() == DomainEventType.READY)) | |
.map(commands.map((id, event) -> { | |
DomainEntity payload = event.getPayload(); | |
if (payload != null) | |
payload.setState(DomainEventType.PLAYING); | |
return event; | |
})).to(Serdes.Long(), new KafkaEventSerde(), "play-stream"); | |
// Would be even better if KStream<K,V> supported StreamCommands<T> operators | |
return pipe; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class KafkaEvent implements Serializable { | |
private byte[] bytes; | |
private Long id; | |
private ObjectMapper objectMapper = new ObjectMapper(); | |
public KafkaEvent() { | |
objectMapper = new ObjectMapper(); | |
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); | |
} | |
public KafkaEvent(byte[] bytes, Long id) { | |
this.bytes = bytes; | |
this.id = id; | |
} | |
public KafkaEvent(byte[] bytes) { | |
this.bytes = bytes; | |
} | |
public KafkaEvent(Object payload) { | |
try { | |
this.bytes = new ObjectMapper().writeValueAsBytes(payload); | |
} catch (JsonProcessingException e) { | |
e.printStackTrace(); | |
} | |
} | |
public KafkaEvent(Long id, Object payload) { | |
this(payload); | |
this.id = id; | |
} | |
public byte[] getBytes() { | |
return bytes; | |
} | |
public void setBytes(byte[] bytes) { | |
this.bytes = bytes; | |
} | |
public Long getId() { | |
return id; | |
} | |
public void setId(Long id) { | |
this.id = id; | |
} | |
public <T> T to(Class<T> clazz) { | |
T result = null; | |
try { | |
if (bytes != null && bytes.length > 0) | |
result = objectMapper.readValue(this.bytes, clazz); | |
} catch (IOException e) { | |
throw new RuntimeException(e); | |
} | |
return result; | |
} | |
public <T> String toString(Class<T> clazz) { | |
String result = null; | |
try { | |
result = objectMapper.writeValueAsString(to(clazz)); | |
} catch (JsonProcessingException e) { | |
e.printStackTrace(); | |
} | |
assert result != null; | |
return result.trim(); | |
} | |
public static KafkaEvent withPayload(Object object) { | |
return new KafkaEvent(Long.valueOf(object.hashCode()), object); | |
} | |
public static KafkaEvent withPayload(Long key, Object object) { | |
return new KafkaEvent(key, object); | |
} | |
@Override | |
public String toString() { | |
String result; | |
if (bytes != null) { | |
result = ("{\"id\":" + id + ",\"message\":" + toString(Object.class) + "}").trim(); | |
} else { | |
result = super.toString(); | |
} | |
return result; | |
} | |
@Override | |
public boolean equals(Object o) { | |
if (this == o) return true; | |
if (o == null || getClass() != o.getClass()) return false; | |
KafkaEvent that = (KafkaEvent) o; | |
if (!Arrays.equals(bytes, that.bytes)) return false; | |
return id != null ? id.equals(that.id) : that.id == null; | |
} | |
@Override | |
public int hashCode() { | |
int result = Arrays.hashCode(bytes); | |
result = 31 * result + (id != null ? id.hashCode() : 0); | |
return result; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class KafkaEventSerde implements Serde<KafkaEvent> { | |
private final JsonSerializer<KafkaEvent> serializer = new JsonSerializer<>(); | |
private final JsonDeserializer<KafkaEvent> deserializer = new JsonDeserializer<>(KafkaEvent.class); | |
public KafkaEventSerde() { | |
serializer.configure(null, false); | |
deserializer.configure(null, false); | |
} | |
@Override | |
public void configure(Map<String, ?> configs, boolean isKey) { | |
this.serializer.configure(configs, isKey); | |
this.deserializer.configure(configs, isKey); | |
} | |
@Override | |
public void close() { | |
this.serializer.close(); | |
this.deserializer.close(); | |
} | |
@Override | |
public Serializer<KafkaEvent> serializer() { | |
return this.serializer; | |
} | |
@Override | |
public Deserializer<KafkaEvent> deserializer() { | |
return this.deserializer; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Mapper<V, R> implements KeyValueMapper<Long, KafkaEvent, KeyValue<Long, KafkaEvent>> { | |
private final Class<V> clazz; | |
private final KeyValueMapper<Long, V, R> mapper; | |
public Mapper(Class<V> clazz, KeyValueMapper<Long, V, R> mapper) { | |
this.clazz = clazz; | |
this.mapper = mapper; | |
} | |
@Override | |
public KeyValue<Long, KafkaEvent> apply(Long key, KafkaEvent value) { | |
return new KeyValue<>(key, KafkaEvent.withPayload(mapper.apply(key, value.to(clazz)))); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class StreamCommands<T> { | |
private Class<T> clazz; | |
public StreamCommands(Class<T> clazz) { | |
this.clazz = clazz; | |
} | |
public Filter<T> filter(Function<T, Boolean> predicate) { | |
return new Filter<>(clazz, predicate); | |
} | |
public Aggregate<T> aggregate(Aggregator<Long, T, T> aggregator) { | |
return new Aggregate<>(clazz, aggregator); | |
} | |
public Mapper<T, ?> map(KeyValueMapper<Long, T, ?> mapper) { | |
return new Mapper<>(clazz, mapper); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment