Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.kafka.common.serialization.Serde;
- import org.apache.kafka.common.serialization.Serdes;
- import org.apache.kafka.common.utils.Bytes;
- import org.apache.kafka.streams.StreamsBuilder;
- import org.apache.kafka.streams.Topology;
- import org.apache.kafka.streams.kstream.Consumed;
- import org.apache.kafka.streams.kstream.GlobalKTable;
- import org.apache.kafka.streams.kstream.Grouped;
- import org.apache.kafka.streams.kstream.KGroupedStream;
- import org.apache.kafka.streams.kstream.KStream;
- import org.apache.kafka.streams.kstream.KTable;
- import org.apache.kafka.streams.kstream.Materialized;
- import org.apache.kafka.streams.kstream.Produced;
- import org.apache.kafka.streams.state.KeyValueStore;
- public class InventoryProcessor {
- private final StreamsBuilder builder;
- private static final String RESTOCK_TOPIC = "restock-events-topic";
- private static final String SALE_TOPIC = "sale-events-topic";
- private static final String PRODUCT_INFO_TOPIC = "product-info-topic";
- private static final String INVENTORY_OUTPUT_TOPIC = "inventory-output-topic";
- public InventoryProcessor() {
- this.builder = new StreamsBuilder();
- }
- public Topology buildTopology() {
- // Сериализаторы и десериализаторы
- Serde<String> stringSerde = Serdes.String();
- Serde<RestockEvent> restockEventSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(RestockEvent.class));
- Serde<SaleEvent> saleEventSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(SaleEvent.class));
- Serde<ProductInfo> productInfoSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(ProductInfo.class));
- Serde<InventoryChange> inventoryChangeSerde = Serdes.serdeFrom(new GsonSerializer<>(), new GsonDeserializer<>(InventoryChange.class));
- Serde<Integer> integerSerde = Serdes.Integer();
- // Чтение потоков
- KStream<String, RestockEvent> restockStream = builder.stream(RESTOCK_TOPIC, Consumed.with(stringSerde, restockEventSerde));
- KStream<String, SaleEvent> saleStream = builder.stream(SALE_TOPIC, Consumed.with(stringSerde, saleEventSerde));
- GlobalKTable<String, ProductInfo> productInfoTable = builder.globalTable(PRODUCT_INFO_TOPIC, Consumed.with(stringSerde, productInfoSerde));
- // Обогащение данных о поступлениях информацией о товаре
- KStream<String, RestockEvent> enrichedRestockStream = restockStream.join(
- productInfoTable,
- (key, value) -> value.getProductId(),
- (restockEvent, productInfo) -> {
- // Обогащаем restockEvent при необходимости
- return restockEvent;
- }
- );
- // Обогащение данных о продажах информацией о товаре
- KStream<String, SaleEvent> enrichedSaleStream = saleStream.join(
- productInfoTable,
- (key, value) -> value.getProductId(),
- (saleEvent, productInfo) -> {
- // Обогащаем saleEvent при необходимости
- return saleEvent;
- }
- );
- // Создаем класс InventoryChange
- KStream<String, InventoryChange> restockChanges = enrichedRestockStream
- .mapValues(restockEvent ->
- new InventoryChange(restockEvent.getProductId(), restockEvent.getQuantity())
- );
- KStream<String, InventoryChange> saleChanges = enrichedSaleStream
- .mapValues(saleEvent ->
- new InventoryChange(saleEvent.getProductId(), -saleEvent.getQuantity())
- );
- // Объединяем поступления и продажи
- KStream<String, InventoryChange> inventoryChanges = restockChanges.merge(saleChanges);
- // Группируем по productId
- KGroupedStream<String, InventoryChange> groupedInventory = inventoryChanges.groupByKey(Grouped.with(stringSerde, inventoryChangeSerde));
- // Агрегируем изменения
- KTable<String, Integer> inventoryState = groupedInventory.aggregate(
- () -> 0,
- (key, value, aggregate) -> aggregate + value.getQuantityChange(),
- Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("inventory-store")
- .withKeySerde(stringSerde)
- .withValueSerde(integerSerde)
- );
- // Выводим состояние в выходной топик
- inventoryState.toStream().to(INVENTORY_OUTPUT_TOPIC, Produced.with(stringSerde, integerSerde));
- return builder.build();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment