Guest User

Untitled

a guest
Nov 23rd, 2017
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.09 KB | None | 0 0
  1. case class TestClass(userId: Option[String], id: Option[String])
  2.  
  3.  
  4. def testMethod(TENANT: String) {
  5. val sparkConf = new SparkConf()
  6. val spark = SparkSession.builder.config(sparkConf).getOrCreate()
  7. import spark.implicits._
  8.  
  9. val test_struct = ScalaReflection.schemaFor[TestClass].dataType.asInstanceOf[StructType]
  10.  
  11. var collection1 = mongoUtil.getCollectionFromMongoDB(TENANT,coll1,test_struct).repartition('userId)
  12. var collection2 = mongoUtil.getCollectionFromMongoDB(TENANT,coll2,test_struct).repartition('userId)
  13. var collection3 = mongoUtil.getCollectionFromMongoDB(TENANT,coll3,test_struct).repartition('userId)
  14.  
  15.  
  16. val finalDataFrame =collection1.union(collection2).union(collection3).select(
  17. 'id as "id",
  18. 'userId as "user_id"
  19. ...
  20. ...
  21. ...
  22. )
  23. .agg(countDistinct('id).cast(IntegerType) as "no_of_posts")
  24.  
  25.  
  26. mongoUtil.saveDataframeToMongoDB(TENANT, "Final-collectionName", finalDataFrame)
  27.  
  28.  
  29. }
  30.  
  31. def saveDataframeToMongoDB(TENANT: String, collectionName: String, df: DataFrame): Boolean = {
  32. val sparkConf = new SparkConf()
  33. val spark = SparkSession.builder.config(sparkConf).getOrCreate()
  34. import spark.implicits._
  35. var result = false
  36. val mongodbURI = conf.getString("mongodb." + TENANT + ".uri")
  37. val mongoDB = conf.getString("mongodb." + TENANT + ".database")
  38. val mongomap = Map("uri" -> mongodbURI, "database" -> mongoDB, "collection" -> collectionName)
  39. MongoSpark.save(df.write.mode(SaveMode.Overwrite), WriteConfig(mongomap))
  40. result = true
  41. result
  42. }
  43.  
  44.  
  45. def getCollectionFromMongoDB(TENANT: String, collectionName: String, schema: StructType = null): DataFrame = {
  46. val spark = AppConfig.spark
  47. import spark.implicits._
  48. var df: DataFrame = null
  49. val mongodbURI = conf.getString("mongodb." + TENANT + ".uri")
  50. val mongoDB = conf.getString("mongodb." + TENANT + ".database")
  51. val mongomap = Map("uri" -> mongodbURI, "database" -> mongoDB, "collection" -> collectionName)
  52. df = spark.read.format("com.mongodb.spark.sql").schema(schema).options(mongomap).load()
  53. df
  54. }
Add Comment
Please, Sign In to add comment