Advertisement
laris_fdz

Untitled

Jan 16th, 2023
543
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 1.53 KB | None | 0 0
  1. import datetime
  2. def input_paths(date, depth):
  3.     spisok = []
  4.     for i in range(depth):
  5.         start_date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
  6.         next_date = start_date + datetime.timedelta(days=-i)
  7.         path = f'hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net:8020/user/larisfdz/data/events/date={next_date}/event_type=message'
  8.         spisok.append(path)
  9.     return(spisok)
  10. input_p = input_paths('2022-05-31', 7)
  11.  
  12. df = spark.read.parquet(*input_p)
  13. #all_tags = df.filter(F.col('event.message_channel_to').isNotNull()) \
  14. #                     .select(['event.message_from', F.explode(F.col('event.tags')) \
  15. #                              .alias('tag')]).groupBy(F.col('tag')) \
  16. #                                                       .agg(F.expr('count(distinct message_from)').alias#('n_mentions'))
  17. #all_tags_top = all_tags.filter(F.col('n_mentions')>99)
  18. all_tags_top = df.where("event.message_channel_to is not null").selectExpr(["event.message_from as user", "explode(event.tags) as tag"]).groupBy(
  19.     "tag").agg(F.expr("count(distinct user) as suggested_count")).where("suggested_count >= 100")
  20.  
  21. verified_tags = spark.read \
  22.     .parquet('hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net:8020/user/master/data/snapshots/tags_verified/actual')
  23. candidates = all_tags_top.join(verified_tags, 'tag', 'left_anti')
  24. candidates \
  25.     .write \
  26.     .format('parquet') \
  27.     .save(f'hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net:8020/user/larisfdz/data/analytics/candidates_d7_pyspark')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement