Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.vng.zing.zudm_zingmp3_suggestion.relistening_prediction
- import org.apache.spark.sql.SQLContext
- import org.apache.spark.sql.Dataset
- import org.apache.spark.sql.Row
- import com.vng.zing.zudm_zingmp3_suggestion.spark_io.DFGenericInputOutputTransformation
- import org.apache.spark.sql.types.{ StructType, LongType, IntegerType, BooleanType, StringType, TimestampType, DoubleType }
- import org.apache.spark.sql.functions._
- import org.apache.spark.sql.expressions.Window
- abstract class FeatureExtractionFromLog extends DFGenericInputOutputTransformation {
- def filter: String
- def transformation(sc: SQLContext, input: Dataset[Row]): Dataset[Row] = {
- import sc.implicits._
- input.sqlContext.sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", 400000000)
- // input.sqlContext.sql("set spark.sql.shuffle.partitions=50")
- val cur_date = new java.sql.Timestamp(System.currentTimeMillis())
- println("Last timestamp: " + cur_date)
- val ds = input
- .filter(filter)
- .withColumn("pos", $"pos".cast(DoubleType))
- .withColumn("duration", $"duration".cast(DoubleType))
- .withColumn("fullyListen", ($"pos" / $"duration" >= 0.7).cast(IntegerType))
- .withColumn("ratio", $"pos" / $"duration")
- .withColumn("date", from_unixtime($"timestamp" / 1000, "yyyy-MM-dd HH:mm:ss"))
- .withColumn("dateTmp", date_add(lit(cur_date), 1))
- .withColumn("daysDiff", datediff($"dateTmp", $"date"))
- .withColumn("isDownloaded", $"isDownloaded".cast(IntegerType))
- .withColumn("age", $"age".cast(IntegerType))
- .withColumn("type", $"type".cast(IntegerType))
- .withColumn("volume", $"volume".cast(IntegerType))
- .filter($"type" === 1 || $"type" === 2)
- .na.fill(0, Seq("isDownloaded"))
- .na.fill(50, Seq("volume"))
- .na.fill(25, Seq("age"))
- .na.fill(0, Seq("fullyListen"))
- .na.fill(0.5, Seq("ratio"))
- .select(
- $"user_id".as("uid"),
- $"id".as("sid"),
- $"isDownloaded",
- $"volume",
- $"age",
- $"fullyListen",
- $"ratio",
- $"daysDiff")
- val byLastListen = Window.partitionBy($"uid", $"sid").orderBy($"daysDiff".asc)
- val group_last_listen_ratio = ds.withColumn("rn", row_number.over(byLastListen))
- .where($"rn" === 1)
- .drop("rn")
- .select($"uid", $"sid", $"ratio".as("lastListenRatio"))
- val stdDiff = udf((days: Seq[Int]) => {
- if (days.length == 1) {
- 0.0
- } else {
- val daysSet = days.toSet.toList
- var dist: List[Int] = List()
- var i = 0;
- for (i <- 0 until daysSet.length - 1) {
- dist = Math.abs(daysSet(i) - daysSet(i + 1)) :: dist
- }
- val mean = 1.0 / dist.length * dist.reduce(_ + _)
- var std = 0.0
- for (i <- 0 until dist.length) {
- std += (mean - dist(i)) * (mean - dist(i))
- }
- std = Math.sqrt(std * 1.0 / (days.length))
- std
- }
- })
- val group_listen_time_std = ds.groupBy("uid").agg(collect_set("daysDiff").as("listDaysDiff"))
- .withColumn("stdDaysDiff", stdDiff($"listDaysDiff"))
- .select("uid", "stdDaysDiff")
- val group_count_listen = ds.groupBy("uid", "sid")
- .count()
- .withColumnRenamed("count", "count_listen")
- .filter($"count_listen" < 200)
- val group_avg_count_listen = group_count_listen.groupBy("uid")
- .count()
- .withColumnRenamed("count", "total_listen")
- val group_avg_count_listen_song = group_count_listen.join(group_avg_count_listen, Seq("uid"), "inner")
- .withColumn("avg_listen_ratio", $"count_listen" / $"total_listen")
- .select("uid", "sid", "count_listen", "avg_listen_ratio")
- val group_count_songs = ds.groupBy("uid")
- .agg(countDistinct("sid").as("count_song"))
- .filter($"count_song" < 300)
- val group_nearest_listen = ds.groupBy("uid", "sid")
- .agg(min("daysDiff").as("nearest_time"))
- val group_longest_listen = ds.groupBy("uid", "sid")
- .agg((max("daysDiff") - min("daysDiff")).as("longest_time"))
- val group_last_listen = ds.groupBy("uid")
- .agg(min("daysDiff").as("last_time"))
- val group_diff_days = ds.groupBy("uid", "sid")
- .agg(countDistinct("daysDiff").as("noDays"))
- val group_listen_ratio = ds.groupBy("uid", "sid")
- .agg(avg("ratio").as("listenRatio"))
- val group_fully_listen_ratio = ds.groupBy("uid", "sid")
- .agg((sum("fullyListen") / count("fullyListen")).as("fullyListenRatio"))
- val group_age = ds.groupBy("uid")
- .agg(max("age").as("age"))
- .withColumn("age", when($"age" < 12, 12).when($"age" > 50, 50).otherwise($"age"))
- val group_is_downloaded = ds.groupBy("uid", "sid")
- .agg(max("isDownloaded").as("isDownloaded"))
- val group_volume = ds.groupBy("uid", "sid")
- .agg(mean("volume").as("volume"))
- val ds_merged_1 = group_avg_count_listen_song
- .join(group_nearest_listen, Seq("uid", "sid"), "inner")
- .join(group_longest_listen, Seq("uid", "sid"), "inner")
- val ds_merged_2 = ds_merged_1
- .join(group_diff_days, Seq("uid", "sid"), "inner")
- .join(group_listen_ratio, Seq("uid", "sid"), "inner")
- val ds_merged_3 = ds_merged_2
- .join(group_last_listen_ratio, Seq("uid", "sid"), "inner")
- .join(group_fully_listen_ratio, Seq("uid", "sid"), "inner")
- val ds_merged_4 = ds_merged_3
- .join(group_volume, Seq("uid", "sid"), "inner")
- .join(group_count_songs, Seq("uid"), "inner")
- val ds_merged_5 = ds_merged_4
- .join(group_age, Seq("uid"), "inner")
- .join(group_last_listen, Seq("uid"), "inner")
- val ds_merged = ds_merged_5
- .join(group_listen_time_std, Seq("uid"), "inner")
- .join(group_is_downloaded, Seq("uid", "sid"), "inner")
- ds_merged.coalesce(35)
- }
- }
- object FeatureExtractionFirstPart extends FeatureExtractionFromLog {
- def filter = "user_id < 1010000000"
- }
- object FeatureExtractionSecondPart extends FeatureExtractionFromLog {
- def filter = "user_id >= 1010000000 and user_id < 1020000000"
- }
- object FeatureExtractionThirdPart extends FeatureExtractionFromLog {
- def filter = "user_id >= 1020000000"
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement