Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.util.Properties;
- import org.apache.kafka.streams.KafkaStreams;
- import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.Topology;
- public class InventoryApp {
- public static void main(String[] args) {
- var processor = new InventoryProcessor();
- var topology = processor.buildTopology();
- var streams = getKafkaStreams(topology);
- streams.start();
- var restService = new InventoryRestService(streams);
- restService.start();
- Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
- }
- private static KafkaStreams getKafkaStreams(Topology topology) {
- var props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, "inventory-app");
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "org.apache.kafka.common.serialization.Serdes$StringSerde");
- return new KafkaStreams(topology, props);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment