Guest User

Untitled

a guest
Dec 11th, 2018
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.99 KB | None | 0 0
  1. private KTable<String, theDataList> globalStream() {
  2.  
  3. // KStream of records from data-in topic using String and theDataSerde deserializers
  4. KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));
  5.  
  6. // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
  7. KGroupedStream<String, Data> KGS = trashStream.groupByKey();
  8.  
  9. Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
  10. materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
  11.  
  12. // Return a KTable
  13. return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
  14. if (!value.getValideData())
  15. aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
  16. else
  17. aggregate.getList().add(value);
  18. return aggregate;
  19. }, materialized);
  20. }
Add Comment
Please, Sign In to add comment