Advertisement
Guest User

Untitled

a guest
Sep 7th, 2017
140
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.87 KB | None | 0 0
  1. @Bean
  2. public SparkConf sparkConf() {
  3. return new SparkConf()
  4. .setMaster("local[*]")
  5. .setAppName("test")
  6. .set("spark.app.id", "test")
  7. .set("spark.mongodb.input.uri", "mongodb://127.0.0.1/")
  8. .set("spark.mongodb.output.uri", "mongodb://127.0.0.1/")
  9. .set("spark.mongodb.input.database", "myDataBase")
  10. .set("spark.mongodb.output.database", "myDataBase");
  11. }
  12.  
  13. @Bean
  14. public JavaSparkContext javaSparkContext() {
  15. return new JavaSparkContext(sparkConf());
  16. }
  17.  
  18. @Bean
  19. public SQLContext sqlContext() {
  20. return new SQLContext(SparkSession
  21. .builder()
  22. .appName("eat")
  23. .master("local[*]")
  24. .config(sparkConf())
  25. .getOrCreate());
  26. }
  27.  
  28. <dependency>
  29. <groupId>org.mongodb.spark</groupId>
  30. <artifactId>mongo-spark-connector_2.11</artifactId>
  31. <version>2.0.0</version>
  32. </dependency>
  33.  
  34. ReadConfig readConfig = ReadConfig.create(sparkContext)
  35. .withOption("spark.mongodb.output.collection", "myCollection");
  36. JavaRDD<Document> rdd = MongoSpark.load(sparkContext, readConfig);
  37.  
  38. "Missing collection name. Set via the 'spark.mongodb.input.uri'
  39. or 'spark.mongodb.input.collection' property"
  40.  
  41. SparkConf()
  42. .setMaster("local[*]")
  43. .setAppName("test")
  44. .set("spark.app.id", "test")
  45. .set("spark.mongodb.input.uri", "mongodb://127.0.0.1/myDataBase.myCollection")
  46. .set("spark.mongodb.output.uri", "mongodb://127.0.0.1/myDataBase.myCollection")
  47.  
  48. @SpringBootConfiguration
  49. public class SparkConfiguration {
  50.  
  51. private final String MONGO_PREFIX = "mongodb://";
  52. private final String MONGO_INPUT_COLLECTION = "faqs";
  53.  
  54. @Value(value = "${spring.data.mongodb.name}")
  55. private String mongoName;
  56.  
  57. @Value(value = "${spring.data.mongodb.net.bindIp}")
  58. private String mongoHost;
  59.  
  60. @Bean
  61. public SparkSession sparkSession() {
  62. return SparkSession.builder()
  63. .master("local[*]")
  64. .appName("eat-spark-cluster")
  65. .config("spark.app.id", "Eat")
  66. .config("spark.mongodb.input.uri", MONGO_PREFIX.concat(mongoHost).concat("/"))
  67. .config("spark.mongodb.input.database", mongoName)
  68. .config("spark.mongodb.input.collection", MONGO_INPUT_COLLECTION)
  69. .getOrCreate();
  70. }
  71.  
  72. @Bean
  73. public JavaSparkContext javaSparkContext() {
  74. return JavaSparkContext.fromSparkContext(sparkSession().sparkContext());
  75. }
  76. }
  77.  
  78.  
  79. ReadConfig readConfig = ReadConfig.create(getJavaSparkContext()).withOption("collection", "my_collection");
  80. JavaMongoRDD<Document> placesRdd = MongoSpark.load(getJavaSparkContext(), readConfig);
  81.  
  82. return placesRdd.collect();
  83.  
  84. package mongo;
  85.  
  86. import com.mongodb.spark.MongoSpark;
  87. import com.mongodb.spark.config.WriteConfig;
  88. import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
  89. import org.apache.spark.api.java.JavaRDD;
  90. import org.apache.spark.api.java.JavaSparkContext;
  91. import org.apache.spark.sql.SparkSession;
  92. import org.bson.Document;
  93.  
  94. import java.util.Arrays;
  95. import java.util.HashMap;
  96. import java.util.Map;
  97. import java.util.Scanner;
  98.  
  99. public class Connector {
  100. String db1="mongodb://127.0.0.1/";
  101. String db2= "mongodb://192.168.4.180/";
  102. String dbUrl = db1;
  103.  
  104. String user = ";";
  105. String pass = "";
  106. String dbName = "test";
  107. String collName="spark";
  108.  
  109.  
  110. public static void main(String[] args) {
  111. Connector con=new Connector();
  112. JavaSparkContext jsc = con.connection();
  113. // con.writeToMongo(jsc);
  114. con.readFromMongo(jsc);
  115.  
  116. Scanner sc= new Scanner(System.in);
  117. sc.next();
  118. }
  119. JavaSparkContext connection() {
  120.  
  121. SparkSession ss = SparkSession.builder()
  122. .master("local")
  123. .appName("MongoConnector")
  124. .config("spark.mongodb.input.uri", dbUrl + dbName)
  125. .config("spark.mongodb.output.uri", dbUrl + dbName)
  126. .config("spark.mongodb.output.collection",collName)
  127. .config("spark.mongodb.input.collection",collName)
  128. .getOrCreate();
  129.  
  130. JavaSparkContext jsc=new JavaSparkContext(ss.sparkContext());
  131.  
  132.  
  133. return jsc;
  134.  
  135. // jsc.close();
  136. }
  137.  
  138.  
  139. void readFromMongo(JavaSparkContext jsc){
  140. JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
  141. System.out.print(rdd.collect());
  142. }
  143.  
  144. void writeToMongo(JavaSparkContext jsc){
  145. JavaRDD<Document> rdd = jsc.parallelize(Arrays.asList(1, 2, 3))
  146. .map(x -> Document.parse("{spark: "+x+"}"));
  147.  
  148.  
  149.  
  150. Map<String,String > writeconf=new HashMap<String,String>();
  151.  
  152. writeconf.put("collection","spark");
  153. writeconf.put("writeConcern.w", "majority");
  154.  
  155. WriteConfig writeConfig = WriteConfig.create(jsc).withOptions(writeconf);
  156. MongoSpark.save(rdd,writeConfig);
  157.  
  158. }
  159. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement