Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val usersByKey = usersStream
- .groupByKey
- .reduce((first, second) => if (first.updatedOn.isAfter(second.updatedOn)) first else second)
- val postsByAuthor = postsStream
- .groupBy((_, post) => post.author)
- .aggregate(
- () => Map.empty[Id[Post], Post],
- (_, post: Post, posts: Map[Id[Post], Post]) =>
- if (posts.get(post.id).exists(_.updatedOn.isAfter(post.updatedOn))) posts
- else posts + (post.id -> post),
- Materialized.`with`[Id[User], Map[Id[Post], Post], KeyValueStore[Bytes, Array[Byte]]](
- CirceSerdes.serde[Id[User]],
- CirceSerdes.serde[Map[Id[Post], Post]]
- )
- )
- .mapValues(_.values.toSet)
Add Comment
Please, Sign In to add comment