Guest User

Untitled

a guest
May 20th, 2018
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.99 KB | None | 0 0
  1. import sys
  2. from pyspark import SparkContext
  3. from pyspark import HiveContext
  4. from pyspark.sql.types import *
  5. from _collections import defaultdict
  6. from datetime import date
  7. from operator import add
  8. from datetime import datetime
  9.  
  10. sc = SparkContext()
  11.  
  12. sql='''
  13. select imei as key, collect_list(distinct day) as days from ana_login.ana_user_login where day > 20180101 group by imei
  14. '''
  15.  
  16. # sqlCtx = HiveContext(sc)
  17.  
  18. df = sqlCtx.sql(sql)
  19.  
  20. def handle(row):
  21. '''
  22. row: (key,days)
  23. flatmap
  24. (key,days) => [(interval_days,times),(interval_days,times)]
  25. '''
  26. result = []
  27. tid = row['key']
  28. day = row['days']
  29. day.sort()
  30. distribution = defaultdict(int)
  31. for before,after in zip(day[0:],day[1:]):
  32. interval = (datetime.strptime(str(after),"%Y%m%d") - datetime.strptime(str(before),"%Y%m%d")).days
  33. distribution[interval] += 1
  34. for k,v in distribution.items():
  35. result.append((k,v))
  36. return result
  37.  
  38. df.rdd.flatMap(handle).reduceByKey(add).collect()
  39. # print(rdd.collect())
Add Comment
Please, Sign In to add comment