Guest User

Untitled

a guest
Nov 22nd, 2017
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.94 KB | None | 0 0
  1. JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc,
  2. String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);
  3.  
  4. directKafkaStream.foreachRDD(rdd ->{
  5.  
  6. rdd.foreach(avroRecord -> {
  7.  
  8. byte[] encodedAvroData = avroRecord._2;
  9. LocationType t = deserialize(encodedAvroData);
  10.  
  11. MongoClientOptions.Builder options_builder = new MongoClientOptions.Builder();
  12. options_builder.maxConnectionIdleTime(60000);
  13. MongoClientOptions options = options_builder.build();
  14. MongoClient mongo = new MongoClient ("localhost:27017", options);
  15.  
  16. MongoDatabase database = mongo.getDatabase("DB");
  17. MongoCollection<Document> collection = database.getCollection("collection");
  18.  
  19. Document myDoc = collection.find(eq("key", 4)).first();
  20. System.out.println(myDoc);
  21.  
  22. });
  23. });
Add Comment
Please, Sign In to add comment