Guest User

Untitled

a guest
May 24th, 2018
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.64 KB | None | 0 0
  1. val likesByKey = likesStream
  2. .groupByKey
  3. .aggregate(
  4. () => Set.empty[Like],
  5. (_, like: Like, likes: Set[Like]) => if (like.unliked) likes - like else likes + like,
  6. Materialized.`with`[Id[Post], Set[Like], KeyValueStore[Bytes, Array[Byte]]](
  7. CirceSerdes.serde[Id[Post]],
  8. CirceSerdes.serde[Set[Like]]
  9. )
  10. )
  11.  
  12. val commentCountByKey = commentsStream
  13. .groupByKey
  14. .aggregate(
  15. () => 0,
  16. (_, comment: Comment, count: Int) => if (comment.deleted) count - 1 else count + 1,
  17. Materialized.`with`[Id[Post], Int, KeyValueStore[Bytes, Array[Byte]]](
  18. CirceSerdes.serde[Id[Post]],
  19. CirceSerdes.serde[Int]
  20. )
  21. )
Add Comment
Please, Sign In to add comment