Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime
- def input_paths(date, depth):
- spisok = []
- for i in range(depth):
- start_date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
- next_date = start_date + datetime.timedelta(days=-i)
- path = f'hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net:8020/user/larisfdz/data/events/date={next_date}/event_type=message'
- spisok.append(path)
- return(spisok)
- input_p = input_paths('2022-05-31', 7)
- df = spark.read.parquet(*input_p)
- #all_tags = df.filter(F.col('event.message_channel_to').isNotNull()) \
- # .select(['event.message_from', F.explode(F.col('event.tags')) \
- # .alias('tag')]).groupBy(F.col('tag')) \
- # .agg(F.expr('count(distinct message_from)').alias#('n_mentions'))
- #all_tags_top = all_tags.filter(F.col('n_mentions')>99)
- all_tags_top = df.where("event.message_channel_to is not null").selectExpr(["event.message_from as user", "explode(event.tags) as tag"]).groupBy(
- "tag").agg(F.expr("count(distinct user) as suggested_count")).where("suggested_count >= 100")
- verified_tags = spark.read \
- .parquet('hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net:8020/user/master/data/snapshots/tags_verified/actual')
- candidates = all_tags_top.join(verified_tags, 'tag', 'left_anti')
- candidates \
- .write \
- .format('parquet') \
- .save(f'hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net:8020/user/larisfdz/data/analytics/candidates_d7_pyspark')
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement