Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import io.javalin.Javalin;
- import org.apache.kafka.streams.KafkaStreams;
- import org.apache.kafka.streams.StoreQueryParameters;
- import org.apache.kafka.streams.state.QueryableStoreTypes;
- import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
- public class InventoryRestService {
- private final KafkaStreams streams;
- public InventoryRestService(KafkaStreams streams) {
- this.streams = streams;
- }
- public void start() {
- var app = Javalin.create().start(7777);
- app.get("/inventory/{productId}", ctx -> {
- String productId = ctx.pathParam("productId");
- ReadOnlyKeyValueStore<String, Integer> keyValueStore = streams.store(
- StoreQueryParameters.fromNameAndType("inventory-store", QueryableStoreTypes.keyValueStore())
- );
- var quantity = keyValueStore.get(productId);
- if (quantity != null) {
- ctx.json(new InventoryResponse(productId, quantity));
- } else {
- ctx.status(404).result("Product not found");
- }
- });
- }
- public record InventoryResponse(String productId, int quantity) {
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment