Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # -*- coding: utf-8 -*-
- """用户行为统计"""
- import datetime
- import os
- from pyspark.sql import SparkSession
- def process_a_day(date):
- """处理一天的日志"""
- day = date.day
- if os.path.exists("/Users/yudu/Downloads/" + str(day)) is False:
- return
- log_file = "/Users/yudu/Downloads/" + str(day) + "/*/*.snappy.parquet"
- log_data = spark.read.parquet(log_file)
- log_data.show()
- log_data.createTempView("behaviors")
- behavior_stats = spark.sql("SELECT '" + date.today().__str__() + "' as date, forum_id,\
- COUNT(IF(action='post',true,null)) AS posts, COUNT(IF(action='delpost',true,null)) AS delposts, \
- COUNT(IF(action='view',true,null)) AS views, \
- COUNT(IF(action='comment',true,null)) AS comments, COUNT(IF(action='delcomment',true,null)) AS delcomments, \
- COUNT(IF(action='like',true,null)) AS likes, COUNT(IF(action='dellike',true,null)) AS dellikes, \
- COUNT(IF(action='favorite',true,null)) AS favorites, COUNT(IF(action='delfavorite',true,null)) AS delfavorites, \
- COUNT(IF(action='share',true,null)) AS shares \
- FROM behaviors GROUP BY forum_id ORDER BY forum_id")
- behavior_stats.show()
- behavior_stats.write.jdbc("jdbc:mysql://localhost/aliyun_log_test", "wb_forum_behaviors", mode="append",
- properties={"driver": "com.mysql.jdbc.Driver", "useSSL": "false", "user": "root"})
- def get_last_day():
- """获取上一次统计的日期"""
- df = spark.read.jdbc("jdbc:mysql://localhost/aliyun_log_test", "wb_forum_behaviors",
- properties={"driver": "com.mysql.jdbc.Driver", "useSSL": "false", "user": "root"})
- date = df.orderBy(df.date.desc()).first().date
- return date
- spark = SparkSession.builder.appName("User Behavior Stats").master(
- "local").config("spark.sql.parquet.binaryAsString", True).getOrCreate()
- # 开始时间为上次统计日期加一天
- d = get_last_day() + datetime.timedelta(days=1)
- # 结束时间为昨天
- end_day = datetime.date.today() - datetime.timedelta(days=1)
- while d <= end_day:
- print d.strftime("%Y-%m-%d")
- process_a_day(d)
- d += datetime.timedelta(days=1)
- spark.stop()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement