Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.vng.zing.zudm_zingmp3_suggestion.relistening_prediction
- import org.apache.spark.sql.SQLContext
- import org.apache.spark.sql.Dataset
- import org.apache.spark.sql.Row
- import com.vng.zing.zudm_zingmp3_suggestion.spark_io.DFGenericInputOutputTransformation
- import org.apache.spark.sql.types.{ StructType, LongType, IntegerType, BooleanType, StringType, TimestampType, DoubleType }
- import org.apache.spark.sql.functions._
- import org.apache.spark.sql.expressions.Window
- object FeatureExtractionFromLog extends DFGenericInputOutputTransformation {
- // def filter: String
- def transformation(sc: SQLContext, input: Dataset[Row]): Dataset[Row] = {
- import sc.implicits._
- input.sqlContext.setConf("spark.sql.crossJoin.enabled", "true")
- val timestamp_range = 1538844955000L
- val uids = List(1020030296, // ngocbk
- // 1005987278, // danhd
- 1018617591 //trunglh2
- // 1004523473, // tunm
- // 1000979242 // quitct
- )
- // val input2 = input.filter($"user_id" > 1000000000 && $"user_id" < 1000050000 && $"user_id".isin(uids:_*))
- val input2 = input.filter($"user_id".isin(uids:_*))
- input2.show
- val ds = input
- // .filter("user_id >= 1010000000 and user_id < 1020000000")
- // .filter(filter)
- .withColumn("pos", $"pos".cast(DoubleType))
- .withColumn("duration", $"duration".cast(DoubleType))
- .withColumn("fullyListen", ($"pos" / $"duration" >= 0.7).cast(IntegerType))
- .withColumn("ratio", $"pos" / $"duration")
- .withColumn("date", from_unixtime($"timestamp" / 1000, "yyyy-MM-dd HH:mm:ss"))
- .withColumn("dateTmp", from_unixtime(lit(timestamp_range / 1000), "yyyy-MM-dd HH:mm:ss"))
- .withColumn("daysDiff", datediff($"dateTmp", $"date"))
- .withColumn("isDownloaded", $"isDownloaded".cast(IntegerType))
- .withColumn("age", $"age".cast(IntegerType))
- .withColumn("type", $"type".cast(IntegerType))
- .withColumn("volume", $"volume".cast(IntegerType))
- .filter($"type" === 1 || $"type" === 2)
- .na.fill(0, Seq("isDownloaded"))
- .na.fill(50, Seq("volume"))
- .na.fill(25, Seq("age"))
- .na.fill(0, Seq("fullyListen"))
- .na.fill(0.5, Seq("ratio"))
- .select(
- $"user_id".as("uid"),
- $"id".as("sid"),
- $"isDownloaded",
- $"volume",
- $"age",
- $"fullyListen",
- $"ratio",
- $"daysDiff")
- println("DS...")
- val byLastListen = Window.partitionBy($"uid", $"sid").orderBy($"daysDiff".asc)
- val group_last_listen_ratio = ds.withColumn("rn", row_number.over(byLastListen))
- .where($"rn" === 1)
- .drop("rn")
- .select($"uid", $"sid", $"ratio".as("lastListenRatio"))
- println("group_last_listen_ratio...")
- val stdDiff = udf((days: Seq[Int]) => {
- if (days.length == 1) {
- 0.0
- } else {
- val daysSet = days.toSet.toList
- var dist: List[Int] = List()
- var i = 0;
- for (i <- 0 until daysSet.length - 1) {
- dist = Math.abs(daysSet(i) - daysSet(i + 1)) :: dist
- }
- val mean = 1.0 / dist.length * dist.reduce(_ + _)
- var std = 0.0
- for (i <- 0 until dist.length) {
- std += (mean - dist(i)) * (mean - dist(i))
- }
- std = Math.sqrt(std * 1.0 / (days.length))
- std
- }
- })
- val group_listen_time_std = ds.groupBy("uid").agg(collect_set("daysDiff").as("listDaysDiff"))
- .withColumn("stdDaysDiff", stdDiff($"listDaysDiff"))
- .select("uid", "stdDaysDiff")
- println("group_listen_time_std...")
- val group_count_listen = ds.groupBy("uid", "sid")
- .count()
- .withColumnRenamed("count", "count_listen")
- // .filter($"count_listen" < 200)
- println("group_count_listen...")
- val group_avg_count_listen = group_count_listen.groupBy("uid")
- .count()
- .withColumnRenamed("count", "total_listen")
- println("group_avg_count_listen...")
- val group_avg_count_listen_song = group_count_listen.join(group_avg_count_listen, Seq("uid"), "inner")
- .withColumn("avg_listen_ratio", $"count_listen" / $"total_listen")
- .select("uid", "sid", "count_listen", "avg_listen_ratio")
- println("group_avg_count_listen_song...")
- val group_count_songs = ds.groupBy("uid")
- .agg(countDistinct("sid").as("count_song"))
- // .filter($"count_song" < 300)
- println("group_count_songs...")
- val group_nearest_listen = ds.groupBy("uid", "sid")
- .agg(min("daysDiff").as("nearest_time"))
- println("group_nearest_listen...")
- val group_longest_listen = ds.groupBy("uid", "sid")
- .agg((max("daysDiff") - min("daysDiff")).as("longest_time"))
- println("group_longest_listen...")
- val group_last_listen = ds.groupBy("uid")
- .agg(min("daysDiff").as("last_time"))
- println("group_last_listen...")
- val group_diff_days = ds.groupBy("uid", "sid")
- .agg(countDistinct("daysDiff").as("noDays"))
- println("group_diff_days...")
- val group_listen_ratio = ds.groupBy("uid", "sid")
- .agg(avg("ratio").as("listenRatio"))
- println("group_listen_ratio...")
- val group_fully_listen_ratio = ds.groupBy("uid", "sid")
- .agg((sum("fullyListen") / count("fullyListen")).as("fullyListenRatio"))
- println("group_fully_listen_ratio...")
- val group_age = ds.groupBy("uid")
- .agg(max("age").as("age"))
- .withColumn("age", when($"age" < 12, 12).when($"age" > 50, 50).otherwise($"age"))
- println("group_age...")
- val group_is_downloaded = ds.groupBy("uid", "sid")
- .agg(max("isDownloaded").as("isDownloaded"))
- println("group_is_downloaded...")
- val group_volume = ds.groupBy("uid", "sid")
- .agg(mean("volume").as("volume"))
- println("group_volume...")
- println("joining started...")
- val ds_merged = group_avg_count_listen_song
- .join(group_nearest_listen, Seq("uid", "sid"), "inner")
- .join(group_longest_listen, Seq("uid", "sid"), "inner")
- .join(group_diff_days, Seq("uid", "sid"), "inner")
- .join(group_listen_ratio, Seq("uid", "sid"), "inner")
- .join(group_last_listen_ratio, Seq("uid", "sid"), "inner")
- .join(group_fully_listen_ratio, Seq("uid", "sid"), "inner")
- .join(group_volume, Seq("uid", "sid"), "inner")
- .join(group_count_songs, Seq("uid"), "inner")
- .join(group_age, Seq("uid"), "inner")
- .join(group_last_listen, Seq("uid"), "inner")
- .join(group_listen_time_std, Seq("uid"), "inner")
- .join(group_is_downloaded, Seq("uid", "sid"), "inner")
- ds_merged.filter($"uid".isin(uids:_*))
- .show(false)
- val ds_merged_uids = ds_merged.select("uid").collect().map(_(0)).toSet
- // println("ds_merged_uids")
- // println(ds_merged_uids.mkString(","))
- val input_uids = input.select("user_id").collect().map(_(0)).toSet
- // println("input_uids")
- // println(input_uids.mkString(","))
- println("dif")
- val dif = input_uids diff ds_merged_uids
- println(dif.mkString(","))
- null
- }
- }
- //object FeatureExtractionFromLogFirstPart extends FeatureExtractionFromLog{
- // def filter = "user_id < 1010000000"
- //}
- //
- //object FeatureExtractionFromLogSecondPart extends FeatureExtractionFromLog{
- // def filter = "user_id >= 1010000000 and user_id < 1020000000"
- //}
- //
- //object FeatureExtractionFromLogThirdPart extends FeatureExtractionFromLog{
- // def filter = "user_id >= 1020000000"
- //}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement