temirlan100

Untitled

Dec 2nd, 2024
58
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 4.85 KB | None | 0 0
  1. import org.apache.kafka.common.serialization.Serde;
  2. import org.apache.kafka.common.serialization.Serdes;
  3. import org.apache.kafka.common.utils.Bytes;
  4. import org.apache.kafka.streams.StreamsBuilder;
  5. import org.apache.kafka.streams.Topology;
  6. import org.apache.kafka.streams.kstream.Consumed;
  7. import org.apache.kafka.streams.kstream.GlobalKTable;
  8. import org.apache.kafka.streams.kstream.Grouped;
  9. import org.apache.kafka.streams.kstream.KGroupedStream;
  10. import org.apache.kafka.streams.kstream.KStream;
  11. import org.apache.kafka.streams.kstream.KTable;
  12. import org.apache.kafka.streams.kstream.Materialized;
  13. import org.apache.kafka.streams.kstream.Produced;
  14. import org.apache.kafka.streams.state.KeyValueStore;
  15.  
  16. public class InventoryProcessor {
  17.  
  18.     private final StreamsBuilder builder;
  19.     private static final String RESTOCK_TOPIC = "restock-events-topic";
  20.     private static final String SALE_TOPIC = "sale-events-topic";
  21.     private static final String PRODUCT_INFO_TOPIC = "product-info-topic";
  22.     private static final String INVENTORY_OUTPUT_TOPIC = "inventory-output-topic";
  23.  
  24.     public InventoryProcessor() {
  25.         this.builder = new StreamsBuilder();
  26.     }
  27.  
  28.     public Topology buildTopology() {
  29.         // Сериализаторы и десериализаторы
  30.         Serde<String> stringSerde = Serdes.String();
  31.         Serde<RestockEvent> restockEventSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(RestockEvent.class));
  32.         Serde<SaleEvent> saleEventSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(SaleEvent.class));
  33.         Serde<ProductInfo> productInfoSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(ProductInfo.class));
  34.         Serde<InventoryChange> inventoryChangeSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(InventoryChange.class));
  35.         Serde<Integer> integerSerde = Serdes.Integer();
  36.  
  37.         // Чтение потоков
  38.         KStream<String, RestockEvent> restockStream = builder.stream(RESTOCK_TOPIC, Consumed.with(stringSerde, restockEventSerde));
  39.         KStream<String, SaleEvent> saleStream = builder.stream(SALE_TOPIC, Consumed.with(stringSerde, saleEventSerde));
  40.         GlobalKTable<String, ProductInfo> productInfoTable = builder.globalTable(PRODUCT_INFO_TOPIC, Consumed.with(stringSerde, productInfoSerde));
  41.  
  42.         // Обогащение данных о поступлениях информацией о товаре
  43.         KStream<String, RestockEvent> enrichedRestockStream = restockStream.join(
  44.                 productInfoTable,
  45.                 (key, value) -> value.getProductId(),
  46.                 (restockEvent, productInfo) -> {
  47.                     // Обогащаем restockEvent при необходимости
  48.                     return restockEvent;
  49.                 }
  50.         );
  51.  
  52.         // Обогащение данных о продажах информацией о товаре
  53.         KStream<String, SaleEvent> enrichedSaleStream = saleStream.join(
  54.                 productInfoTable,
  55.                 (key, value) -> value.getProductId(),
  56.                 (saleEvent, productInfo) -> {
  57.                     // Обогащаем saleEvent при необходимости
  58.                     return saleEvent;
  59.                 }
  60.         );
  61.  
  62.         // Создаем класс InventoryChange
  63.         KStream<String, InventoryChange> restockChanges = enrichedRestockStream
  64.                 .mapValues(restockEvent ->
  65.                         new InventoryChange(restockEvent.getProductId(), restockEvent.getQuantity())
  66.                 );
  67.  
  68.         KStream<String, InventoryChange> saleChanges = enrichedSaleStream
  69.                 .mapValues(saleEvent ->
  70.                         new InventoryChange(saleEvent.getProductId(), -saleEvent.getQuantity())
  71.                 );
  72.  
  73.         // Объединяем поступления и продажи
  74.         KStream<String, InventoryChange> inventoryChanges = restockChanges.merge(saleChanges);
  75.  
  76.         // Группируем по productId
  77.         KGroupedStream<String, InventoryChange> groupedInventory = inventoryChanges.groupByKey(Grouped.with(stringSerde, inventoryChangeSerde));
  78.  
  79.         // Агрегируем изменения
  80.         KTable<String, Integer> inventoryState = groupedInventory.aggregate(
  81.                 () -> 0,
  82.                 (key, value, aggregate) -> aggregate + value.getQuantityChange(),
  83.                 Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("inventory-store")
  84.                         .withKeySerde(stringSerde)
  85.                         .withValueSerde(integerSerde)
  86.         );
  87.  
  88.         // Выводим состояние в выходной топик
  89.         inventoryState.toStream().to(INVENTORY_OUTPUT_TOPIC, Produced.with(stringSerde, integerSerde));
  90.  
  91.         return builder.build();
  92.     }
  93. }
Advertisement
Add Comment
Please, Sign In to add comment