Advertisement
Guest User

Untitled

a guest
Aug 4th, 2018
388
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.47 KB | None | 0 0
  1. package com.github.masterries.kafka;
  2.  
  3. import com.google.gson.JsonObject;
  4. import com.google.gson.JsonParser;
  5. import org.apache.kafka.clients.consumer.ConsumerConfig;
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;
  7. import org.apache.kafka.clients.consumer.ConsumerRecords;
  8. import org.apache.kafka.clients.consumer.KafkaConsumer;
  9. import org.json.simple.JSONObject;
  10. import org.json.simple.parser.JSONParser;
  11. import org.json.simple.parser.ParseException;
  12.  
  13. import java.util.Arrays;
  14. import java.util.Properties;
  15. import java.util.UUID;
  16.  
  17. import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
  18. import static org.apache.kafka.streams.StreamsConfig.BOOTSTRAP_SERVERS_CONFIG;
  19.  
  20. public class Consumer_A {
  21.  
  22.  
  23.  
  24.  
  25.     public static void main(String[] args) throws InterruptedException {
  26.  
  27.         Properties props = new Properties();
  28.         props.put(BOOTSTRAP_SERVERS_CONFIG, "192.168.56.3:9092");
  29.         props.put(GROUP_ID_CONFIG, "i");
  30.         props.put(ENABLE_AUTO_COMMIT_CONFIG, "true");
  31.         props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  32.         props.put(SESSION_TIMEOUT_MS_CONFIG, "30000");
  33.         props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  34.         props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
  35.         props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
  36.         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
  37.         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  38.  
  39.         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  40.  
  41.         consumer.subscribe(Arrays.asList("FeaturesOfInterest"));
  42.  
  43.         System.out.println("Consumer A gestartet!");
  44.  
  45.  
  46.         JSONObject obj;
  47.  
  48.         while(true) {
  49.  
  50.             ConsumerRecords<String, String> records = consumer.poll(1000);
  51.             if (records.count() == 0)
  52.                 continue;
  53.  
  54.             for (ConsumerRecord<String, String> record : records){
  55.  
  56.                 String result = record.value();
  57.                 int i = result.indexOf("{");
  58.                 result = result.substring(i);
  59.                 try {
  60.                     JSONObject jo = (JSONObject) new JSONParser().parse(result.toString());
  61.                     System.out.println(jo.get("@iot.id"));
  62.                 } catch (ParseException e) {
  63.                     e.printStackTrace();
  64.                 }
  65.  
  66.  
  67.  
  68.             }
  69.  
  70.  
  71.         }
  72.     }
  73. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement