Guest User

Untitled

a guest
Jun 21st, 2018
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.42 KB | None | 0 0
  1. from datetime import datetime
  2.  
  3. bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
  4. project = spark._jsc.hadoopConfiguration().get("fs.gs.project.id")
  5.  
  6. todays_date = datetime.strftime(datetime.today(), "%Y-%m-%d-%H-%M-%S")
  7.  
  8. conf = {
  9. # Input Parameters
  10. "mapred.bq.project.id": project,
  11. "mapred.bq.gcs.bucket": bucket,
  12. "mapred.bq.temp.gcs.path": input_directory,
  13. "mapred.bq.input.project.id": 'project_A',
  14. "mapred.bq.input.dataset.id": "datasetA",
  15. "mapred.bq.input.table.id": "tableA",
  16. }
  17.  
  18. table_data = spark.sparkContext.newAPIHadoopRDD(
  19. "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
  20. "org.apache.hadoop.io.LongWritable",
  21. "com.google.gson.JsonObject",
  22. conf=conf)
  23.  
  24. # Extract the JSON strings from the RDD.
  25. table_json = table_data.map(lambda x: x[1])
  26.  
  27. # Load the JSON strings as a Spark Dataframe.
  28. ts1_data = spark.read.json(table_json)
  29. # Create a view so that Spark SQL queries can be run against the data.
  30. ts1_data.createOrReplaceTempView("dataA")
  31.  
  32. conf2 = {
  33. # Input Parameters
  34. "mapred.bq.project.id": project,
  35. "mapred.bq.gcs.bucket": bucket,
  36. "mapred.bq.temp.gcs.path": input_directory,
  37. "mapred.bq.input.project.id": 'project_B',
  38. "mapred.bq.input.dataset.id": "datasetB",
  39. "mapred.bq.input.table.id": "tableB",
  40. }
  41.  
  42. table_data2 = spark.sparkContext.newAPIHadoopRDD(
  43. "com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat",
  44. "org.apache.hadoop.io.LongWritable",
  45. "com.google.gson.JsonObject",
  46. conf=conf2)
Add Comment
Please, Sign In to add comment