Guest User

Untitled

a guest
Dec 17th, 2017
104
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.66 KB | None | 0 0
  1. package org.apache.kafka.test;
  2.  
  3. import org.apache.kafka.clients.consumer.ConsumerConfig;
  4. import org.apache.kafka.common.serialization.Serdes;
  5. import org.apache.kafka.streams.StreamsConfig;
  6. import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
  7.  
  8. import java.util.Properties;
  9.  
  10. public class MiscTest {
  11.  
  12. private static Properties getProperties() {
  13. Properties settings = new Properties();
  14. settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "appid");
  15. settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
  16. "localhost:9092");
  17. settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
  18. "localhost:2181");
  19. settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
  20. Serdes.String().getClass());
  21. settings.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
  22. Serdes.String().getClass());
  23.  
  24. settings.put(StreamsConfig.STATE_DIR_CONFIG,"/tmp/customerStoreLocal6");
  25. settings.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  26. WallclockTimestampExtractor.class);
  27.  
  28. settings.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,"exactly_once");
  29. settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
  30. settings.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10 * 1024 * 1024L);
  31. return settings;
  32. }
  33.  
  34. public static void main(String[] args) {
  35.  
  36. System.out.println(getProperties().getProperty(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
  37.  
  38. StreamsConfig config = new StreamsConfig(getProperties());
  39.  
  40. System.out.println(config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG));
  41. }
  42. }
Add Comment
Please, Sign In to add comment