Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- reactions = spark.read.parquet("/user/madaxell/data/events/events.parquet")\
- .where("event_type = 'reaction'")\
- .select(F.col("event.message_id").alias("message_id"),
- F.col("event.reaction_from").alias("user_id"),
- F.col("event.reaction_type").alias("reaction"))
- messages = spark.read.parquet("/user/madaxell/data/events/events.parquet")\
- .where("event_type = 'message'")\
- .where("event.message_channel_to is not null")\
- .select(F.col("event.message_id").alias("message_id"),
- F.explode(F.col("event.tags")).alias("tag"))
- Table = messages\
- .join(reactions, messages.message_id == reactions.message_id, "left")\
- .groupBy("user_id", "tag", "reaction")\
- .agg(F.count("*").alias("tag_count"))\
- .withColumn("rank", F.row_number().over(Window.partitionBy("user_id", "reaction")\
- .orderBy(F.desc("tag_count"), F.desc("tag"))))\
- .where("rank <= 3")
- Table_Likes = Table.where("reaction = 'like'"). select("user_id", "tag", "rank")\
- .groupBy("user_id")\
- .pivot("rank", [1, 2, 3])\
- .agg(F.first("tag"))\
- .withColumnRenamed("1", "like_tag_top_1")\
- .withColumnRenamed("2", "like_tag_top_2")\
- .withColumnRenamed("3", "like_tag_top_3")
- Table_DisLikes = Table.where("reaction = 'dislike'"). select("user_id", "tag", "rank")\
- .groupBy("user_id")\
- .pivot("rank", [1, 2, 3])\
- .agg(F.first("tag"))\
- .withColumnRenamed("1", "like_tag_top_1")\
- .withColumnRenamed("2", "like_tag_top_2")\
- .withColumnRenamed("3", "like_tag_top_3")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement