Advertisement
Mad_Axell

Untitled

Feb 27th, 2023
38
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.45 KB | None | 0 0
  1. reactions = spark.read.parquet("/user/madaxell/data/events/events.parquet")\
  2. .where("event_type = 'reaction'")\
  3. .select(F.col("event.message_id").alias("message_id"),
  4. F.col("event.reaction_from").alias("user_id"),
  5. F.col("event.reaction_type").alias("reaction"))
  6.  
  7. messages = spark.read.parquet("/user/madaxell/data/events/events.parquet")\
  8. .where("event_type = 'message'")\
  9. .where("event.message_channel_to is not null")\
  10. .select(F.col("event.message_id").alias("message_id"),
  11. F.explode(F.col("event.tags")).alias("tag"))
  12.  
  13. Table = messages\
  14. .join(reactions, messages.message_id == reactions.message_id, "left")\
  15. .groupBy("user_id", "tag", "reaction")\
  16. .agg(F.count("*").alias("tag_count"))\
  17. .withColumn("rank", F.row_number().over(Window.partitionBy("user_id", "reaction")\
  18. .orderBy(F.desc("tag_count"), F.desc("tag"))))\
  19. .where("rank <= 3")
  20.  
  21. Table_Likes = Table.where("reaction = 'like'"). select("user_id", "tag", "rank")\
  22. .groupBy("user_id")\
  23. .pivot("rank", [1, 2, 3])\
  24. .agg(F.first("tag"))\
  25. .withColumnRenamed("1", "like_tag_top_1")\
  26. .withColumnRenamed("2", "like_tag_top_2")\
  27. .withColumnRenamed("3", "like_tag_top_3")
  28.  
  29.  
  30. Table_DisLikes = Table.where("reaction = 'dislike'"). select("user_id", "tag", "rank")\
  31. .groupBy("user_id")\
  32. .pivot("rank", [1, 2, 3])\
  33. .agg(F.first("tag"))\
  34. .withColumnRenamed("1", "like_tag_top_1")\
  35. .withColumnRenamed("2", "like_tag_top_2")\
  36. .withColumnRenamed("3", "like_tag_top_3")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement