Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- case class TestClass(userId: Option[String], id: Option[String])
- def testMethod(TENANT: String) {
- val sparkConf = new SparkConf()
- val spark = SparkSession.builder.config(sparkConf).getOrCreate()
- import spark.implicits._
- val test_struct = ScalaReflection.schemaFor[TestClass].dataType.asInstanceOf[StructType]
- var collection1 = mongoUtil.getCollectionFromMongoDB(TENANT,coll1,test_struct).repartition('userId)
- var collection2 = mongoUtil.getCollectionFromMongoDB(TENANT,coll2,test_struct).repartition('userId)
- var collection3 = mongoUtil.getCollectionFromMongoDB(TENANT,coll3,test_struct).repartition('userId)
- val finalDataFrame =collection1.union(collection2).union(collection3).select(
- 'id as "id",
- 'userId as "user_id"
- ...
- ...
- ...
- )
- .agg(countDistinct('id).cast(IntegerType) as "no_of_posts")
- mongoUtil.saveDataframeToMongoDB(TENANT, "Final-collectionName", finalDataFrame)
- }
- def saveDataframeToMongoDB(TENANT: String, collectionName: String, df: DataFrame): Boolean = {
- val sparkConf = new SparkConf()
- val spark = SparkSession.builder.config(sparkConf).getOrCreate()
- import spark.implicits._
- var result = false
- val mongodbURI = conf.getString("mongodb." + TENANT + ".uri")
- val mongoDB = conf.getString("mongodb." + TENANT + ".database")
- val mongomap = Map("uri" -> mongodbURI, "database" -> mongoDB, "collection" -> collectionName)
- MongoSpark.save(df.write.mode(SaveMode.Overwrite), WriteConfig(mongomap))
- result = true
- result
- }
- def getCollectionFromMongoDB(TENANT: String, collectionName: String, schema: StructType = null): DataFrame = {
- val spark = AppConfig.spark
- import spark.implicits._
- var df: DataFrame = null
- val mongodbURI = conf.getString("mongodb." + TENANT + ".uri")
- val mongoDB = conf.getString("mongodb." + TENANT + ".database")
- val mongomap = Map("uri" -> mongodbURI, "database" -> mongoDB, "collection" -> collectionName)
- df = spark.read.format("com.mongodb.spark.sql").schema(schema).options(mongomap).load()
- df
- }
Add Comment
Please, Sign In to add comment