Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val commentCountByKey = commentsStream
- .groupByKey
- .aggregate(
- () => Set.empty[Id[Comment]],
- (_, comment: Comment, commentIds: Set[Id[Comment]]) =>
- if (comment.deleted) commentIds - comment.id else commentIds + comment.id,
- Materialized.`with`[Id[Post], Set[Id[Comment]], KeyValueStore[Bytes, Array[Byte]]](
- CirceSerdes.serde[Id[Post]],
- CirceSerdes.serde[Set[Id[Comment]]]
- )
- )
- .mapValues(_.size)
Add Comment
Please, Sign In to add comment