This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| implicit class StreamsBuilderOps(streamsBuilder: StreamsBuilderS) { | |
| def streamFromRecord[V] = new StreamBuilder[V] | |
| class StreamBuilder[V] { | |
| def apply[K]()(implicit record: Record[K, V], consumed: Consumed[K, V]): KStreamS[K, V] = | |
| streamsBuilder.stream[K, V](record.topic) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val usersStream = streamsBuilder.streamFromRecord[User]() | |
| val postsStream = streamsBuilder.streamFromRecord[Post]() | |
| val commentsStream = streamsBuilder.streamFromRecord[Comment]() | |
| val likesStream = streamsBuilder.streamFromRecord[Like]() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val usersByKey = usersStream | |
| .groupByKey | |
| .reduce((first, second) => if (first.updatedOn.isAfter(second.updatedOn)) first else second) | |
| val postsByAuthor = postsStream | |
| .groupBy((_, post) => post.author) | |
| .aggregate( | |
| () => Map.empty[Id[Post], Post], | |
| (_, post: Post, posts: Map[Id[Post], Post]) => | |
| if (posts.get(post.id).exists(_.updatedOn.isAfter(post.updatedOn))) posts |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val likesByKey = likesStream | |
| .groupByKey | |
| .aggregate( | |
| () => Set.empty[Like], | |
| (_, like: Like, likes: Set[Like]) => if (like.unliked) likes - like else likes + like, | |
| Materialized.`with`[Id[Post], Set[Like], KeyValueStore[Bytes, Array[Byte]]]( | |
| CirceSerdes.serde[Id[Post]], | |
| CirceSerdes.serde[Set[Like]] | |
| ) | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| val commentCountByKey = commentsStream | |
| .groupByKey | |
| .aggregate( | |
| () => Set.empty[Id[Comment]], | |
| (_, comment: Comment, commentIds: Set[Id[Comment]]) => | |
| if (comment.deleted) commentIds - comment.id else commentIds + comment.id, | |
| Materialized.`with`[Id[Post], Set[Id[Comment]], KeyValueStore[Bytes, Array[Byte]]]( | |
| CirceSerdes.serde[Id[Post]], | |
| CirceSerdes.serde[Set[Id[Comment]]] | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| postsByAuthor | |
| .join(usersByKey, | |
| (posts: Set[Post], author: User) => | |
| posts.map(DenormalisedPost(_, author, DenormalisedPost.Interactions(Set.empty, 0)))) | |
| .toStream | |
| .flatMapValues(identity) | |
| .groupBy((_, denormalisedPost) => denormalisedPost.post.id) | |
| .reduce((first, second) => if (first.post.updatedOn.isAfter(second.post.updatedOn)) first else second) | |
| .leftJoin(likesByKey, | |
| (denormalisedPost: DenormalisedPost, likes: Set[Like]) => |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| implicit class KStreamOps[K, V](stream: KStreamS[K, V]) { | |
| def toTopic(implicit record: Record[K, V], produced: Produced[K, V]) = stream.to(record.topic) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ... | |
| .toStream | |
| .toTopic |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ... | |
| .join(likesByKey, | |
| (denormalisedPost: DenormalisedPost, likes: Set[Like]) => | |
| denormalisedPost.copy(interactions = denormalisedPost.interactions.copy(likes = likes))) | |
| .join(commentCountByKey, | |
| (denormalisedPost: DenormalisedPost, commentCount: Int) => | |
| denormalisedPost.copy(interactions = denormalisedPost.interactions.copy(comments = commentCount))) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ... | |
| .join(likesByKey, | |
| (denormalisedPost: DenormalisedPost, likes: Set[Like]) => | |
| denormalisedPost.lens(_.interactions.likes).set(likes)) | |
| .join(commentCountByKey, | |
| (denormalisedPost: DenormalisedPost, commentCount: Int) => | |
| denormalisedPost.lens(_.interactions.comments).set(commentCount)) |