Skip to content

Instantly share code, notes, and snippets.

@mseemann
Created September 3, 2021 13:52
Show Gist options
  • Save mseemann/dd662de1d72a2b883f9753c1fad661ec to your computer and use it in GitHub Desktop.
Save mseemann/dd662de1d72a2b883f9753c1fad661ec to your computer and use it in GitHub Desktop.
UserCollectionChangeStreamService
reactiveMongoTemplate
.changeStream(User.class)
.watchCollection("user")
.listen()
.subscribe(event -> {
log.info("change stream event {}", event);
BsonDocument newResumeToken = Objects.requireNonNull(event.getRaw()).getResumeToken().asDocument();
BsonString token = newResumeToken.getString("_data");
var timeStampInMs = Objects.requireNonNull(event.getBsonTimestamp()).getTime() * 1000L;
syncToPostgresService.syncInASingleTx(event.getBody(), token.getValue(), timeStampInMs);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment