Guest User

Untitled

a guest
May 24th, 2018
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.43 KB | None | 0 0
  1. val commentCountByKey = commentsStream
  2. .groupByKey
  3. .aggregate(
  4. () => Set.empty[Id[Comment]],
  5. (_, comment: Comment, commentIds: Set[Id[Comment]]) =>
  6. if (comment.deleted) commentIds - comment.id else commentIds + comment.id,
  7. Materialized.`with`[Id[Post], Set[Id[Comment]], KeyValueStore[Bytes, Array[Byte]]](
  8. CirceSerdes.serde[Id[Post]],
  9. CirceSerdes.serde[Set[Id[Comment]]]
  10. )
  11. )
  12. .mapValues(_.size)
Add Comment
Please, Sign In to add comment