Skip to content

Instantly share code, notes, and snippets.

View joan38's full-sized avatar

Joan Goyeau joan38

View GitHub Profile
implicit class StreamsBuilderOps(streamsBuilder: StreamsBuilderS) {
def streamFromRecord[V] = new StreamBuilder[V]
class StreamBuilder[V] {
def apply[K]()(implicit record: Record[K, V], consumed: Consumed[K, V]): KStreamS[K, V] =
streamsBuilder.stream[K, V](record.topic)
}
}
val usersStream = streamsBuilder.streamFromRecord[User]()
val postsStream = streamsBuilder.streamFromRecord[Post]()
val commentsStream = streamsBuilder.streamFromRecord[Comment]()
val likesStream = streamsBuilder.streamFromRecord[Like]()
val usersByKey = usersStream
.groupByKey
.reduce((first, second) => if (first.updatedOn.isAfter(second.updatedOn)) first else second)
val postsByAuthor = postsStream
.groupBy((_, post) => post.author)
.aggregate(
() => Map.empty[Id[Post], Post],
(_, post: Post, posts: Map[Id[Post], Post]) =>
if (posts.get(post.id).exists(_.updatedOn.isAfter(post.updatedOn))) posts
val likesByKey = likesStream
.groupByKey
.aggregate(
() => Set.empty[Like],
(_, like: Like, likes: Set[Like]) => if (like.unliked) likes - like else likes + like,
Materialized.`with`[Id[Post], Set[Like], KeyValueStore[Bytes, Array[Byte]]](
CirceSerdes.serde[Id[Post]],
CirceSerdes.serde[Set[Like]]
)
)
val commentCountByKey = commentsStream
.groupByKey
.aggregate(
() => Set.empty[Id[Comment]],
(_, comment: Comment, commentIds: Set[Id[Comment]]) =>
if (comment.deleted) commentIds - comment.id else commentIds + comment.id,
Materialized.`with`[Id[Post], Set[Id[Comment]], KeyValueStore[Bytes, Array[Byte]]](
CirceSerdes.serde[Id[Post]],
CirceSerdes.serde[Set[Id[Comment]]]
)
postsByAuthor
.join(usersByKey,
(posts: Set[Post], author: User) =>
posts.map(DenormalisedPost(_, author, DenormalisedPost.Interactions(Set.empty, 0))))
.toStream
.flatMapValues(identity)
.groupBy((_, denormalisedPost) => denormalisedPost.post.id)
.reduce((first, second) => if (first.post.updatedOn.isAfter(second.post.updatedOn)) first else second)
.leftJoin(likesByKey,
(denormalisedPost: DenormalisedPost, likes: Set[Like]) =>
implicit class KStreamOps[K, V](stream: KStreamS[K, V]) {
def toTopic(implicit record: Record[K, V], produced: Produced[K, V]) = stream.to(record.topic)
}
...
.toStream
.toTopic
...
.join(likesByKey,
(denormalisedPost: DenormalisedPost, likes: Set[Like]) =>
denormalisedPost.copy(interactions = denormalisedPost.interactions.copy(likes = likes)))
.join(commentCountByKey,
(denormalisedPost: DenormalisedPost, commentCount: Int) =>
denormalisedPost.copy(interactions = denormalisedPost.interactions.copy(comments = commentCount)))
...
.join(likesByKey,
(denormalisedPost: DenormalisedPost, likes: Set[Like]) =>
denormalisedPost.lens(_.interactions.likes).set(likes))
.join(commentCountByKey,
(denormalisedPost: DenormalisedPost, commentCount: Int) =>
denormalisedPost.lens(_.interactions.comments).set(commentCount))