Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from datetime import datetime
- bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
- project = spark._jsc.hadoopConfiguration().get("fs.gs.project.id")
- todays_date = datetime.strftime(datetime.today(), "%Y-%m-%d-%H-%M-%S")
- conf = {
- # Input Parameters
- "mapred.bq.project.id": project,
- "mapred.bq.gcs.bucket": bucket,
- "mapred.bq.temp.gcs.path": input_directory,
- "mapred.bq.input.project.id": 'project_A',
- "mapred.bq.input.dataset.id": "datasetA",
- "mapred.bq.input.table.id": "tableA",
- }
- table_data = spark.sparkContext.newAPIHadoopRDD(
- "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
- "org.apache.hadoop.io.LongWritable",
- "com.google.gson.JsonObject",
- conf=conf)
- # Extract the JSON strings from the RDD.
- table_json = table_data.map(lambda x: x[1])
- # Load the JSON strings as a Spark Dataframe.
- ts1_data = spark.read.json(table_json)
- # Create a view so that Spark SQL queries can be run against the data.
- ts1_data.createOrReplaceTempView("dataA")
- conf2 = {
- # Input Parameters
- "mapred.bq.project.id": project,
- "mapred.bq.gcs.bucket": bucket,
- "mapred.bq.temp.gcs.path": input_directory,
- "mapred.bq.input.project.id": 'project_B',
- "mapred.bq.input.dataset.id": "datasetB",
- "mapred.bq.input.table.id": "tableB",
- }
- table_data2 = spark.sparkContext.newAPIHadoopRDD(
- "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
- "org.apache.hadoop.io.LongWritable",
- "com.google.gson.JsonObject",
- conf=conf2)
Add Comment
Please, Sign In to add comment