Guest User

Untitled

a guest
May 24th, 2018
93
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.62 KB | None | 0 0
  1. val usersByKey = usersStream
  2. .groupByKey
  3. .reduce((first, second) => if (first.updatedOn.isAfter(second.updatedOn)) first else second)
  4.  
  5. val postsByAuthor = postsStream
  6. .groupBy((_, post) => post.author)
  7. .aggregate(
  8. () => Map.empty[Id[Post], Post],
  9. (_, post: Post, posts: Map[Id[Post], Post]) =>
  10. if (posts.get(post.id).exists(_.updatedOn.isAfter(post.updatedOn))) posts
  11. else posts + (post.id -> post),
  12. Materialized.`with`[Id[User], Map[Id[Post], Post], KeyValueStore[Bytes, Array[Byte]]](
  13. CirceSerdes.serde[Id[User]],
  14. CirceSerdes.serde[Map[Id[Post], Post]]
  15. )
  16. )
  17. .mapValues(_.values.toSet)
Add Comment
Please, Sign In to add comment