Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(jsc,
- String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);
- directKafkaStream.foreachRDD(rdd ->{
- rdd.foreach(avroRecord -> {
- byte[] encodedAvroData = avroRecord._2;
- LocationType t = deserialize(encodedAvroData);
- MongoClientOptions.Builder options_builder = new MongoClientOptions.Builder();
- options_builder.maxConnectionIdleTime(60000);
- MongoClientOptions options = options_builder.build();
- MongoClient mongo = new MongoClient ("localhost:27017", options);
- MongoDatabase database = mongo.getDatabase("DB");
- MongoCollection<Document> collection = database.getCollection("collection");
- Document myDoc = collection.find(eq("key", 4)).first();
- System.out.println(myDoc);
- });
- });
Add Comment
Please, Sign In to add comment