Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- RAM: 16GB, CPU cores: 4, SSD: 200GB
- TABLE_NAME SCHEMA NUMBER_OF_ROWS
- table1 (table1Id, table2FkId, table3FkId, ...) 50M
- table2 (table2Id, phonenumber, email,...) 700M
- table3 (table3Id, ...) 2K
- {
- "_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c",
- "idKeys": {
- "email": [
- "a@gmail.com",
- "b@gmail.com"
- ],
- "phonenumber": [
- "1111111111",
- "2222222222"
- ]
- },
- "flag": false,
- ...
- ...
- ...
- }
- {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c", "email": "a@gmail.com", "phonenumber": null},
- {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": "b@gmail.com","phonenumber": null},
- {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": null,"phonenumber": "1111111111"},
- {"_id": "19ac0487-a75f-49d9-928e-c300e0ac7c7c","email": null,"phonenumber": "22222222222"}
- import com.mongodb.spark.MongoSpark;
- import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
- import org.apache.spark.SparkContext;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.sql.Dataset;
- import org.apache.spark.sql.SQLContext;
- import org.apache.spark.sql.SparkSession;
- import org.bson.Document;
- import java.util.Arrays;
- public class SparkMongoRedshiftTest {
- private static SparkSession sparkSession;
- private static SparkContext sparkContext;
- private static SQLContext sqlContext;
- public static void main(String[] args) {
- sparkSession = SparkSession.builder().appName("redshift-spark-test").getOrCreate();
- sparkContext = sparkSession.sparkContext();
- sqlContext = new SQLContext(sparkContext);
- Dataset table1Dataset = executeRedshiftQuery("(SELECT table1Id,table2FkId,table3FkId FROM table1)");
- table1Dataset.createOrReplaceTempView("table1Dataset");
- Dataset table2Dataset = executeRedshiftQuery("(SELECT table2Id,phonenumber,email FROM table2)");
- table2Dataset.createOrReplaceTempView("table2Dataset");
- Dataset table3Dataset = executeRedshiftQuery("(SELECT table3Id FROM table3");
- table3Dataset.createOrReplaceTempView("table3Dataset");
- Dataset redshiftJoinedDataset = sqlContext.sql(" SELECT a.*,b.*,c.*" +
- " FROM table1Dataset a " +
- " LEFT JOIN table2Dataset b ON a.table2FkId = b.table2Id" +
- " LEFT JOIN table3Dataset c ON a.table3FkId = c.table3Id");
- redshiftJoinedDataset.createOrReplaceTempView("redshiftJoinedDataset");
- JavaMongoRDD<Document> userIdentityRDD = MongoSpark.load(getJavaSparkContext());
- Dataset mongoDataset = userIdentityRDD.withPipeline(
- Arrays.asList(
- Document.parse("{$match: {flag: false}}"),
- Document.parse("{ $unwind: { path: "$idKeys.email" } }"),
- Document.parse("{$group: {_id: "$_id",emailArr: {$push: {email: "$idKeys.email",phonenumber: {$ifNull: ["$description", null]}}},"idKeys": {$first: "$idKeys"}}}"),
- Document.parse("{$unwind: "$idKeys.phonenumber"}"),
- Document.parse("{$group: {_id: "$_id",phoneArr: {$push: {phonenumber: "$idKeys.phonenumber",email: {$ifNull: ["$description", null]}}},"emailArr": {$first: "$emailArr"}}}"),
- Document.parse("{$project: {_id: 1,value: {$setUnion: ["$emailArr", "$phoneArr"]}}}"),
- Document.parse("{$unwind: "$value"}"),
- Document.parse("{$project: {email: "$value.email",phonenumber: "$value.phonenumber"}}")
- )).toDF();
- mongoDataset.createOrReplaceTempView("mongoDataset");
- Dataset joinRedshiftAndMongoDataset = sqlContext.sql(" SELECT a.* , b._id AS finalId " +
- " FROM redshiftJoinedData AS a INNER JOIN mongoDataset AS b " +
- " ON b.email = a.email OR b.phonenumber = a.phonenumber");
- //aggregating joinRedshiftAndMongoDataset
- //then storing to mysql
- }
- private static Dataset executeRedshiftQuery(String query) {
- return sqlContext.read()
- .format("com.databricks.spark.redshift")
- .option("url", "jdbc://...")
- .option("query", query)
- .option("aws_iam_role", "...")
- .option("tempdir", "s3a://...")
- .load();
- }
- public static JavaSparkContext getJavaSparkContext() {
- sparkContext.conf().set("spark.mongodb.input.uri", "");
- sparkContext.conf().set("spark.sql.crossJoin.enabled", "true");
- return new JavaSparkContext(sparkContext);
- }
- }
- RedshiftDataWithMongoDataJoin => (RedshiftDataJoin) INNER_JOIN (MongoData)
- => (50M LEFT_JOIN 700M LEFT_JOIN 2K) INNER_JOIN (35M)
- => (50M) INNER_JOIN (35M)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement