Skip to content

Instantly share code, notes, and snippets.

@polster
Last active January 3, 2022 21:59
Show Gist options
  • Save polster/35bed926d0c3b2a01f57d44c404f25ec to your computer and use it in GitHub Desktop.
Save polster/35bed926d0c3b2a01f57d44c404f25ec to your computer and use it in GitHub Desktop.
Spring Cloud Stream: BiConsumer
spring:
cloud:
stream:
bindings:
processPersonEvents-in-0:
destination: person-changed-events
processPersonEvents-in-1:
destination: address-changed-events
default:
resetOffsets: true
startOffsets: earliest
kafka:
streams:
binder:
configuration:
num.stream.threads: 2
...
@Bean
public BiConsumer<KTable<String, PersonChangedEvent>, KTable<String, AddressChangedEvent>> processPersonEvents(
ProjectorService projectorService,
MappingContext mappingContext) {
return (personChangedEvents, addressChangedEvents) -> {
personChangedEvents
.mapValues(personChangedEvent -> PersonAggregateRoot.empty().apply(personChangedEvent, mappingContext))
.join(addressChangedEvents, ((root, addressChangedEvent) -> root.apply(addressChangedEvent, mappingContext)))
.mapValues(projectorService::project)
.mapValues(PersonAggregateRoot::getPersonId)
.toStream()
.foreach((k, v) -> log.info("Read model updated for key '{}' and personId '{}'", k, v));
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment