Skip to content

Instantly share code, notes, and snippets.

@dminkovsky
Created February 22, 2017 16:47
Show Gist options
  • Save dminkovsky/fb249c59a2446bf18f9a7b9a24ef7f50 to your computer and use it in GitHub Desktop.
Save dminkovsky/fb249c59a2446bf18f9a7b9a24ef7f50 to your computer and use it in GitHub Desktop.
users
.toStream()
.transform(() -> new Transformer<ByteString, Messages.User, KeyValue<ByteString, List<KeyValue<ByteString, Messages.User>>>>() {
private KeyValueStore<ByteString, Messages.User> store;
@Override
public void init(ProcessorContext context) {
store = (KeyValueStore<ByteString, Messages.User>) context.getStateStore("users-by-login-id-index");
}
@Override
public KeyValue<ByteString, List<KeyValue<ByteString, Messages.User>>> transform(ByteString key, Messages.User next) {
Messages.User last = store.get(key);
List<KeyValue<ByteString, Messages.User>> updates = new ArrayList<>();
// User was deleted
if (next == null) {
// But there was no previous value
if (last == null) {
// So do nothing
return null;
}
// Delete all indicies
for (Messages.Login login : last.getLoginsList()) {
updates.add(new KeyValue<>(login.getId(), null));
}
} else {
store.put(key, next);
// User was created
if (last == null) {
// Index by all of new user's logins
for (Messages.Login login : next.getLoginsList()) {
updates.add(new KeyValue<>(login.getId(), next));
}
} else {
// User was updated
for (Messages.Login login : last.getLoginsList()) {
// Delete all indices no longer related
if (!next.getLoginsList().contains(login)) {
updates.add(new KeyValue<>(login.getId(), null));
}
}
for (Messages.Login login : next.getLoginsList()) {
updates.add(new KeyValue<>(login.getId(), next));
}
}
}
if (updates.isEmpty()) {
return null;
}
return new KeyValue<>(key, updates);
}
@Override
public KeyValue<ByteString, List<KeyValue<ByteString, Messages.User>>> punctuate(long timestamp) { return null; }
@Override
public void close() {}
}, "users-by-login-id-index")
.flatMap((k, v) -> v)
.to(byteStringSerde, userSerde, "users-by-login-id");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment