Advertisement
Guest User

Untitled

a guest
Mar 21st, 2017
139
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.01 KB | None | 0 0
  1. RAM: 16GB, CPU cores: 4, SSD: 200GB
  2.  
  3. TABLE_NAME SCHEMA NUMBER_OF_ROWS
  4. table1 (table1Id, table2FkId, table3FkId, ...) 50M
  5. table2 (table2Id, phonenumber, email,...) 700M
  6. table3 (table3Id, ...) 2K
  7.  
  8. {
  9. "_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c",
  10. "idKeys": {
  11. "email": [
  12. "a@gmail.com",
  13. "b@gmail.com"
  14. ],
  15. "phonenumber": [
  16. "1111111111",
  17. "2222222222"
  18. ]
  19. },
  20. "flag": false,
  21. ...
  22. ...
  23. ...
  24. }
  25.  
  26. {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c", "email": "a@gmail.com", "phonenumber": null},
  27. {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": "b@gmail.com","phonenumber": null},
  28. {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": null,"phonenumber": "1111111111"},
  29. {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": null,"phonenumber": "22222222222"}
  30.  
  31. import com.mongodb.spark.MongoSpark;
  32. import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
  33. import org.apache.spark.SparkContext;
  34. import org.apache.spark.api.java.JavaSparkContext;
  35. import org.apache.spark.sql.Dataset;
  36. import org.apache.spark.sql.SQLContext;
  37. import org.apache.spark.sql.SparkSession;
  38. import org.bson.Document;
  39.  
  40. import java.util.Arrays;
  41.  
  42. public class SparkMongoRedshiftTest {
  43.  
  44. private static SparkSession sparkSession;
  45. private static SparkContext sparkContext;
  46. private static SQLContext sqlContext;
  47.  
  48. public static void main(String[] args) {
  49.  
  50. sparkSession = SparkSession.builder().appName("redshift-spark-test").getOrCreate();
  51. sparkContext = sparkSession.sparkContext();
  52. sqlContext = new SQLContext(sparkContext);
  53.  
  54.  
  55. Dataset table1Dataset = executeRedshiftQuery("(SELECT table1Id,table2FkId,table3FkId FROM table1)");
  56. table1Dataset.createOrReplaceTempView("table1Dataset");
  57.  
  58. Dataset table2Dataset = executeRedshiftQuery("(SELECT table2Id,phonenumber,email FROM table2)");
  59. table2Dataset.createOrReplaceTempView("table2Dataset");
  60.  
  61. Dataset table3Dataset = executeRedshiftQuery("(SELECT table3Id FROM table3");
  62. table3Dataset.createOrReplaceTempView("table3Dataset");
  63.  
  64.  
  65. Dataset redshiftJoinedDataset = sqlContext.sql(" SELECT a.*,b.*,c.*" +
  66. " FROM table1Dataset a " +
  67. " LEFT JOIN table2Dataset b ON a.table2FkId = b.table2Id" +
  68. " LEFT JOIN table3Dataset c ON a.table3FkId = c.table3Id");
  69. redshiftJoinedDataset.createOrReplaceTempView("redshiftJoinedDataset");
  70.  
  71. JavaMongoRDD<Document> userIdentityRDD = MongoSpark.load(getJavaSparkContext());
  72. Dataset mongoDataset = userIdentityRDD.withPipeline(
  73. Arrays.asList(
  74. Document.parse("{$match: {flag: false}}"),
  75. Document.parse("{ $unwind: { path: "$idKeys.email" } }"),
  76. Document.parse("{$group: {_id: "$_id",emailArr: {$push: {email: "$idKeys.email",phonenumber: {$ifNull: ["$description", null]}}},"idKeys": {$first: "$idKeys"}}}"),
  77. Document.parse("{$unwind: "$idKeys.phonenumber"}"),
  78. Document.parse("{$group: {_id: "$_id",phoneArr: {$push: {phonenumber: "$idKeys.phonenumber",email: {$ifNull: ["$description", null]}}},"emailArr": {$first: "$emailArr"}}}"),
  79. Document.parse("{$project: {_id: 1,value: {$setUnion: ["$emailArr", "$phoneArr"]}}}"),
  80. Document.parse("{$unwind: "$value"}"),
  81. Document.parse("{$project: {email: "$value.email",phonenumber: "$value.phonenumber"}}")
  82. )).toDF();
  83. mongoDataset.createOrReplaceTempView("mongoDataset");
  84.  
  85. Dataset joinRedshiftAndMongoDataset = sqlContext.sql(" SELECT a.* , b._id AS finalId " +
  86. " FROM redshiftJoinedData AS a INNER JOIN mongoDataset AS b " +
  87. " ON b.email = a.email OR b.phonenumber = a.phonenumber");
  88.  
  89. //aggregating joinRedshiftAndMongoDataset
  90. //then storing to mysql
  91. }
  92.  
  93. private static Dataset executeRedshiftQuery(String query) {
  94. return sqlContext.read()
  95. .format("com.databricks.spark.redshift")
  96. .option("url", "jdbc://...")
  97. .option("query", query)
  98. .option("aws_iam_role", "...")
  99. .option("tempdir", "s3a://...")
  100. .load();
  101. }
  102.  
  103. public static JavaSparkContext getJavaSparkContext() {
  104. sparkContext.conf().set("spark.mongodb.input.uri", "");
  105. sparkContext.conf().set("spark.sql.crossJoin.enabled", "true");
  106. return new JavaSparkContext(sparkContext);
  107. }
  108. }
  109.  
  110. RedshiftDataWithMongoDataJoin => (RedshiftDataJoin) INNER_JOIN (MongoData)
  111. => (50M LEFT_JOIN 700M LEFT_JOIN 2K) INNER_JOIN (35M)
  112. => (50M) INNER_JOIN (35M)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement