Advertisement
Dundre32

Untitled

Jun 19th, 2020
144
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 2.04 KB | None | 0 0
  1. conf = SparkConf().setAppName("sample_app").set("spark.sql.execution.arrow.enabled", "true")
  2. sc = SparkContext(conf=conf)
  3. sqlContext = SQLContext (sc)
  4.  
  5. load_data = "hdfs:///user/pknees/RSC20/training.tsv" df = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").option("delimiter", "\u0001").load(load_data)
  6.  
  7. column_names = ["text_tokens", "hashtags", "tweet_id", "present_media", "present_links", "present_domains", "tweet_type", "language", "tweet_timestamp", "engaged_with_user_id", "engaged_with_user_follower_count", "engaged_with_user_following_count", "engaged_with_user_is_verified", "engaged_with_user_account_creation", "engaging_user_id", "engaging_user_follower_count", "engaging_user_following_count", "engaging_user_is_verified", "engaging_user_account_creation", "engaged_follows_engaging", "reply_timestamp", "retweet_timestamp", "retweet_with_comment_timestamp", "like_timestamp"]
  8.  
  9.  
  10. def classifyLabels(inputCol):
  11.     if inputCol is not None:
  12.         return 1
  13.  
  14.     return 0
  15.  
  16. labelClassify = udf(lambda inp: classifyLabels(inp))
  17.  
  18.  
  19. dfWithWeekdays = dfWithWeekdays.withColumn("is_reply",labelClassify(f.col("reply_timestamp"))).withColumn("is_retweet",labelClassify(f.col("retweet_timestamp"))).withColumn("is_retweet_with_comment",labelClassify(f.col("retweet_with_comment_timestamp"))).withColumn("is_like",labelClassify(f.col("like_timestamp")))    
  20. dfWithWeekdays = dfWithWeekdays.withColumn("is_reply",
  21.                                      dfWithWeekdays["is_reply"].cast(IntegerType())).withColumn("is_retweet",
  22.                                      dfWithWeekdays["is_retweet"].cast(IntegerType())).withColumn("is_retweet_with_comment",
  23.                                      dfWithWeekdays["is_retweet_with_comment"].cast(IntegerType())).withColumn("is_like",
  24.                                      dfWithWeekdays["is_like"].cast(IntegerType()))
  25.                                      
  26.                                      
  27. dfWithWeekdaysLikes = dfWithWeekdays.filter(dfWithWeekdays["is_like"] == 1)
  28.  
  29. dfWithWeekdaysLikes.groupBy("tweet_timestamp_weekdays").count()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement