Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val likesByKey = likesStream
- .groupByKey
- .aggregate(
- () => Set.empty[Like],
- (_, like: Like, likes: Set[Like]) => if (like.unliked) likes - like else likes + like,
- Materialized.`with`[Id[Post], Set[Like], KeyValueStore[Bytes, Array[Byte]]](
- CirceSerdes.serde[Id[Post]],
- CirceSerdes.serde[Set[Like]]
- )
- )
- val commentCountByKey = commentsStream
- .groupByKey
- .aggregate(
- () => 0,
- (_, comment: Comment, count: Int) => if (comment.deleted) count - 1 else count + 1,
- Materialized.`with`[Id[Post], Int, KeyValueStore[Bytes, Array[Byte]]](
- CirceSerdes.serde[Id[Post]],
- CirceSerdes.serde[Int]
- )
- )
Add Comment
Please, Sign In to add comment