Created
February 22, 2017 16:47
-
-
Save dminkovsky/fb249c59a2446bf18f9a7b9a24ef7f50 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
users | |
.toStream() | |
.transform(() -> new Transformer<ByteString, Messages.User, KeyValue<ByteString, List<KeyValue<ByteString, Messages.User>>>>() { | |
private KeyValueStore<ByteString, Messages.User> store; | |
@Override | |
public void init(ProcessorContext context) { | |
store = (KeyValueStore<ByteString, Messages.User>) context.getStateStore("users-by-login-id-index"); | |
} | |
@Override | |
public KeyValue<ByteString, List<KeyValue<ByteString, Messages.User>>> transform(ByteString key, Messages.User next) { | |
Messages.User last = store.get(key); | |
List<KeyValue<ByteString, Messages.User>> updates = new ArrayList<>(); | |
// User was deleted | |
if (next == null) { | |
// But there was no previous value | |
if (last == null) { | |
// So do nothing | |
return null; | |
} | |
// Delete all indicies | |
for (Messages.Login login : last.getLoginsList()) { | |
updates.add(new KeyValue<>(login.getId(), null)); | |
} | |
} else { | |
store.put(key, next); | |
// User was created | |
if (last == null) { | |
// Index by all of new user's logins | |
for (Messages.Login login : next.getLoginsList()) { | |
updates.add(new KeyValue<>(login.getId(), next)); | |
} | |
} else { | |
// User was updated | |
for (Messages.Login login : last.getLoginsList()) { | |
// Delete all indices no longer related | |
if (!next.getLoginsList().contains(login)) { | |
updates.add(new KeyValue<>(login.getId(), null)); | |
} | |
} | |
for (Messages.Login login : next.getLoginsList()) { | |
updates.add(new KeyValue<>(login.getId(), next)); | |
} | |
} | |
} | |
if (updates.isEmpty()) { | |
return null; | |
} | |
return new KeyValue<>(key, updates); | |
} | |
@Override | |
public KeyValue<ByteString, List<KeyValue<ByteString, Messages.User>>> punctuate(long timestamp) { return null; } | |
@Override | |
public void close() {} | |
}, "users-by-login-id-index") | |
.flatMap((k, v) -> v) | |
.to(byteStringSerde, userSerde, "users-by-login-id"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment