Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.apache.kafka.test;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.common.serialization.Serdes;
- import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
- import java.util.Properties;
- public class MiscTest {
- private static Properties getProperties() {
- Properties settings = new Properties();
- settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "appid");
- settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
- "localhost:9092");
- settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
- "localhost:2181");
- settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
- Serdes.String().getClass());
- settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
- Serdes.String().getClass());
- settings.put(StreamsConfig.STATE_DIR_CONFIG,"/tmp/customerStoreLocal6");
- settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
- WallclockTimestampExtractor.class);
- settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,"exactly_once");
- settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
- settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10 * 1024 * 1024L);
- return settings;
- }
- public static void main(String[] args) {
- System.out.println(getProperties().getProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
- StreamsConfig config = new StreamsConfig(getProperties());
- System.out.println(config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
- }
- }
Add Comment
Please, Sign In to add comment