Advertisement
Guest User

Untitled

a guest
Jul 31st, 2017
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.12 KB | None | 0 0
  1. # -*- coding: utf-8 -*-
  2.  
  3. """用户行为统计"""
  4. import datetime
  5. import os
  6. from pyspark.sql import SparkSession
  7.  
  8.  
  9. def process_a_day(date):
  10. """处理一天的日志"""
  11. day = date.day
  12.  
  13. if os.path.exists("/Users/yudu/Downloads/" + str(day)) is False:
  14. return
  15.  
  16. log_file = "/Users/yudu/Downloads/" + str(day) + "/*/*.snappy.parquet"
  17.  
  18. log_data = spark.read.parquet(log_file)
  19. log_data.show()
  20. log_data.createTempView("behaviors")
  21.  
  22. behavior_stats = spark.sql("SELECT '" + date.today().__str__() + "' as date, forum_id,\
  23. COUNT(IF(action='post',true,null)) AS posts, COUNT(IF(action='delpost',true,null)) AS delposts, \
  24. COUNT(IF(action='view',true,null)) AS views, \
  25. COUNT(IF(action='comment',true,null)) AS comments, COUNT(IF(action='delcomment',true,null)) AS delcomments, \
  26. COUNT(IF(action='like',true,null)) AS likes, COUNT(IF(action='dellike',true,null)) AS dellikes, \
  27. COUNT(IF(action='favorite',true,null)) AS favorites, COUNT(IF(action='delfavorite',true,null)) AS delfavorites, \
  28. COUNT(IF(action='share',true,null)) AS shares \
  29. FROM behaviors GROUP BY forum_id ORDER BY forum_id")
  30. behavior_stats.show()
  31. behavior_stats.write.jdbc("jdbc:mysql://localhost/aliyun_log_test", "wb_forum_behaviors", mode="append",
  32. properties={"driver": "com.mysql.jdbc.Driver", "useSSL": "false", "user": "root"})
  33.  
  34.  
  35. def get_last_day():
  36. """获取上一次统计的日期"""
  37. df = spark.read.jdbc("jdbc:mysql://localhost/aliyun_log_test", "wb_forum_behaviors",
  38. properties={"driver": "com.mysql.jdbc.Driver", "useSSL": "false", "user": "root"})
  39. date = df.orderBy(df.date.desc()).first().date
  40. return date
  41.  
  42.  
  43. spark = SparkSession.builder.appName("User Behavior Stats").master(
  44. "local").config("spark.sql.parquet.binaryAsString", True).getOrCreate()
  45.  
  46. # 开始时间为上次统计日期加一天
  47. d = get_last_day() + datetime.timedelta(days=1)
  48. # 结束时间为昨天
  49. end_day = datetime.date.today() - datetime.timedelta(days=1)
  50. while d <= end_day:
  51. print d.strftime("%Y-%m-%d")
  52. process_a_day(d)
  53. d += datetime.timedelta(days=1)
  54.  
  55. spark.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement