Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- private KTable<String, theDataList> globalStream() {
- // KStream of records from data-in topic using String and theDataSerde deserializers
- KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));
- // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
- KGroupedStream<String, Data> KGS = trashStream.groupByKey();
- Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
- materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
- // Return a KTable
- return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
- if (!value.getValideData())
- aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
- else
- aggregate.getList().add(value);
- return aggregate;
- }, materialized);
- }
Add Comment
Please, Sign In to add comment