Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- from pyspark import SparkContext
- from pyspark import HiveContext
- from pyspark.sql.types import *
- from _collections import defaultdict
- from datetime import date
- from operator import add
- from datetime import datetime
- sc = SparkContext()
- sql='''
- select imei as key, collect_list(distinct day) as days from ana_login.ana_user_login where day > 20180101 group by imei
- '''
- # sqlCtx = HiveContext(sc)
- df = sqlCtx.sql(sql)
- def handle(row):
- '''
- row: (key,days)
- flatmap
- (key,days) => [(interval_days,times),(interval_days,times)]
- '''
- result = []
- tid = row['key']
- day = row['days']
- day.sort()
- distribution = defaultdict(int)
- for before,after in zip(day[0:],day[1:]):
- interval = (datetime.strptime(str(after),"%Y%m%d") - datetime.strptime(str(before),"%Y%m%d")).days
- distribution[interval] += 1
- for k,v in distribution.items():
- result.append((k,v))
- return result
- df.rdd.flatMap(handle).reduceByKey(add).collect()
- # print(rdd.collect())
Add Comment
Please, Sign In to add comment