Advertisement
Guest User

Untitled

a guest
Oct 4th, 2016
105
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.03 KB | None | 0 0
  1. def toRedshift(time, rdd):
  2. try:
  3.  
  4. sqlContext = getSqlContextInstance(rdd.context)
  5. schema = StructType([
  6. StructField('user_id', StringType(), True),
  7. StructField('device_id', StringType(), True),
  8. StructField('steps', IntegerType(), True),
  9. StructField('battery_level', IntegerType(), True),
  10. StructField('calories_spent', IntegerType(), True),
  11. StructField('distance', FloatType(), True),
  12. StructField('current_time', IntegerType(), True),
  13.  
  14. ])
  15. df = sqlContext.createDataFrame(rdd, schema)
  16. df.registerTempTable("activity_log")
  17. df.write \
  18. .format("com.databricks.spark.redshift") \
  19. .option("url", "jdbc:redshiftURL.com:5439/database?user=USERNAME&password=PASSWORD") \
  20. .option("dbtable", "activity_log") \
  21. .option("tempdir", "s3n://spark-temp-data/") \
  22. .mode("append") \
  23. .save()
  24. except Exception as e:
  25. raise(e)
  26.  
  27. py_rdd.foreachRDD(process)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement