Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def toRedshift(time, rdd):
- try:
- sqlContext = getSqlContextInstance(rdd.context)
- schema = StructType([
- StructField('user_id', StringType(), True),
- StructField('device_id', StringType(), True),
- StructField('steps', IntegerType(), True),
- StructField('battery_level', IntegerType(), True),
- StructField('calories_spent', IntegerType(), True),
- StructField('distance', FloatType(), True),
- StructField('current_time', IntegerType(), True),
- ])
- df = sqlContext.createDataFrame(rdd, schema)
- df.registerTempTable("activity_log")
- df.write \
- .format("com.databricks.spark.redshift") \
- .option("url", "jdbc:redshiftURL.com:5439/database?user=USERNAME&password=PASSWORD") \
- .option("dbtable", "activity_log") \
- .option("tempdir", "s3n://spark-temp-data/") \
- .mode("append") \
- .save()
- except Exception as e:
- raise(e)
- py_rdd.foreachRDD(process)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement