Advertisement
Guest User

Untitled

a guest
Oct 17th, 2018
106
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.55 KB | None | 0 0
  1. package com.vng.zing.zudm_zingmp3_suggestion.relistening_prediction
  2.  
  3. import org.apache.spark.sql.SQLContext
  4. import org.apache.spark.sql.Dataset
  5. import org.apache.spark.sql.Row
  6. import com.vng.zing.zudm_zingmp3_suggestion.spark_io.DFGenericInputOutputTransformation
  7. import org.apache.spark.sql.types.{ StructType, LongType, IntegerType, BooleanType, StringType, TimestampType, DoubleType }
  8. import org.apache.spark.sql.functions._
  9. import org.apache.spark.sql.expressions.Window
  10.  
  11. object FeatureExtractionFromLog extends DFGenericInputOutputTransformation {
  12. // def filter: String
  13.  
  14. def transformation(sc: SQLContext, input: Dataset[Row]): Dataset[Row] = {
  15.  
  16. import sc.implicits._
  17.  
  18. input.sqlContext.setConf("spark.sql.crossJoin.enabled", "true")
  19.  
  20. val timestamp_range = 1538844955000L
  21.  
  22. val uids = List(1020030296, // ngocbk
  23. // 1005987278, // danhd
  24. 1018617591 //trunglh2
  25. // 1004523473, // tunm
  26. // 1000979242 // quitct
  27. )
  28.  
  29. // val input2 = input.filter($"user_id" > 1000000000 && $"user_id" < 1000050000 && $"user_id".isin(uids:_*))
  30.  
  31. val input2 = input.filter($"user_id".isin(uids:_*))
  32.  
  33. input2.show
  34.  
  35. val ds = input
  36. // .filter("user_id >= 1010000000 and user_id < 1020000000")
  37. // .filter(filter)
  38. .withColumn("pos", $"pos".cast(DoubleType))
  39. .withColumn("duration", $"duration".cast(DoubleType))
  40. .withColumn("fullyListen", ($"pos" / $"duration" >= 0.7).cast(IntegerType))
  41. .withColumn("ratio", $"pos" / $"duration")
  42. .withColumn("date", from_unixtime($"timestamp" / 1000, "yyyy-MM-dd HH:mm:ss"))
  43. .withColumn("dateTmp", from_unixtime(lit(timestamp_range / 1000), "yyyy-MM-dd HH:mm:ss"))
  44. .withColumn("daysDiff", datediff($"dateTmp", $"date"))
  45. .withColumn("isDownloaded", $"isDownloaded".cast(IntegerType))
  46. .withColumn("age", $"age".cast(IntegerType))
  47. .withColumn("type", $"type".cast(IntegerType))
  48. .withColumn("volume", $"volume".cast(IntegerType))
  49. .filter($"type" === 1 || $"type" === 2)
  50. .na.fill(0, Seq("isDownloaded"))
  51. .na.fill(50, Seq("volume"))
  52. .na.fill(25, Seq("age"))
  53. .na.fill(0, Seq("fullyListen"))
  54. .na.fill(0.5, Seq("ratio"))
  55. .select(
  56. $"user_id".as("uid"),
  57. $"id".as("sid"),
  58. $"isDownloaded",
  59. $"volume",
  60. $"age",
  61. $"fullyListen",
  62. $"ratio",
  63. $"daysDiff")
  64.  
  65. println("DS...")
  66.  
  67. val byLastListen = Window.partitionBy($"uid", $"sid").orderBy($"daysDiff".asc)
  68.  
  69. val group_last_listen_ratio = ds.withColumn("rn", row_number.over(byLastListen))
  70. .where($"rn" === 1)
  71. .drop("rn")
  72. .select($"uid", $"sid", $"ratio".as("lastListenRatio"))
  73.  
  74. println("group_last_listen_ratio...")
  75.  
  76. val stdDiff = udf((days: Seq[Int]) => {
  77.  
  78. if (days.length == 1) {
  79. 0.0
  80. } else {
  81. val daysSet = days.toSet.toList
  82.  
  83. var dist: List[Int] = List()
  84.  
  85. var i = 0;
  86.  
  87. for (i <- 0 until daysSet.length - 1) {
  88. dist = Math.abs(daysSet(i) - daysSet(i + 1)) :: dist
  89. }
  90.  
  91. val mean = 1.0 / dist.length * dist.reduce(_ + _)
  92. var std = 0.0
  93.  
  94. for (i <- 0 until dist.length) {
  95. std += (mean - dist(i)) * (mean - dist(i))
  96. }
  97.  
  98. std = Math.sqrt(std * 1.0 / (days.length))
  99. std
  100. }
  101. })
  102.  
  103. val group_listen_time_std = ds.groupBy("uid").agg(collect_set("daysDiff").as("listDaysDiff"))
  104. .withColumn("stdDaysDiff", stdDiff($"listDaysDiff"))
  105. .select("uid", "stdDaysDiff")
  106.  
  107. println("group_listen_time_std...")
  108.  
  109. val group_count_listen = ds.groupBy("uid", "sid")
  110. .count()
  111. .withColumnRenamed("count", "count_listen")
  112. // .filter($"count_listen" < 200)
  113.  
  114. println("group_count_listen...")
  115.  
  116. val group_avg_count_listen = group_count_listen.groupBy("uid")
  117. .count()
  118. .withColumnRenamed("count", "total_listen")
  119.  
  120. println("group_avg_count_listen...")
  121.  
  122. val group_avg_count_listen_song = group_count_listen.join(group_avg_count_listen, Seq("uid"), "inner")
  123. .withColumn("avg_listen_ratio", $"count_listen" / $"total_listen")
  124. .select("uid", "sid", "count_listen", "avg_listen_ratio")
  125.  
  126. println("group_avg_count_listen_song...")
  127.  
  128. val group_count_songs = ds.groupBy("uid")
  129. .agg(countDistinct("sid").as("count_song"))
  130. // .filter($"count_song" < 300)
  131.  
  132. println("group_count_songs...")
  133.  
  134. val group_nearest_listen = ds.groupBy("uid", "sid")
  135. .agg(min("daysDiff").as("nearest_time"))
  136.  
  137. println("group_nearest_listen...")
  138.  
  139. val group_longest_listen = ds.groupBy("uid", "sid")
  140. .agg((max("daysDiff") - min("daysDiff")).as("longest_time"))
  141.  
  142. println("group_longest_listen...")
  143.  
  144. val group_last_listen = ds.groupBy("uid")
  145. .agg(min("daysDiff").as("last_time"))
  146.  
  147. println("group_last_listen...")
  148.  
  149. val group_diff_days = ds.groupBy("uid", "sid")
  150. .agg(countDistinct("daysDiff").as("noDays"))
  151.  
  152. println("group_diff_days...")
  153.  
  154. val group_listen_ratio = ds.groupBy("uid", "sid")
  155. .agg(avg("ratio").as("listenRatio"))
  156.  
  157. println("group_listen_ratio...")
  158.  
  159. val group_fully_listen_ratio = ds.groupBy("uid", "sid")
  160. .agg((sum("fullyListen") / count("fullyListen")).as("fullyListenRatio"))
  161.  
  162. println("group_fully_listen_ratio...")
  163.  
  164. val group_age = ds.groupBy("uid")
  165. .agg(max("age").as("age"))
  166. .withColumn("age", when($"age" < 12, 12).when($"age" > 50, 50).otherwise($"age"))
  167.  
  168. println("group_age...")
  169.  
  170. val group_is_downloaded = ds.groupBy("uid", "sid")
  171. .agg(max("isDownloaded").as("isDownloaded"))
  172.  
  173. println("group_is_downloaded...")
  174.  
  175. val group_volume = ds.groupBy("uid", "sid")
  176. .agg(mean("volume").as("volume"))
  177.  
  178. println("group_volume...")
  179.  
  180. println("joining started...")
  181.  
  182. val ds_merged = group_avg_count_listen_song
  183. .join(group_nearest_listen, Seq("uid", "sid"), "inner")
  184. .join(group_longest_listen, Seq("uid", "sid"), "inner")
  185. .join(group_diff_days, Seq("uid", "sid"), "inner")
  186. .join(group_listen_ratio, Seq("uid", "sid"), "inner")
  187. .join(group_last_listen_ratio, Seq("uid", "sid"), "inner")
  188. .join(group_fully_listen_ratio, Seq("uid", "sid"), "inner")
  189. .join(group_volume, Seq("uid", "sid"), "inner")
  190. .join(group_count_songs, Seq("uid"), "inner")
  191. .join(group_age, Seq("uid"), "inner")
  192. .join(group_last_listen, Seq("uid"), "inner")
  193. .join(group_listen_time_std, Seq("uid"), "inner")
  194. .join(group_is_downloaded, Seq("uid", "sid"), "inner")
  195.  
  196. ds_merged.filter($"uid".isin(uids:_*))
  197. .show(false)
  198.  
  199. val ds_merged_uids = ds_merged.select("uid").collect().map(_(0)).toSet
  200.  
  201. // println("ds_merged_uids")
  202. // println(ds_merged_uids.mkString(","))
  203.  
  204. val input_uids = input.select("user_id").collect().map(_(0)).toSet
  205.  
  206. // println("input_uids")
  207. // println(input_uids.mkString(","))
  208.  
  209. println("dif")
  210. val dif = input_uids diff ds_merged_uids
  211.  
  212. println(dif.mkString(","))
  213.  
  214. null
  215. }
  216.  
  217. }
  218.  
  219. //object FeatureExtractionFromLogFirstPart extends FeatureExtractionFromLog{
  220. // def filter = "user_id < 1010000000"
  221. //}
  222. //
  223. //object FeatureExtractionFromLogSecondPart extends FeatureExtractionFromLog{
  224. // def filter = "user_id >= 1010000000 and user_id < 1020000000"
  225. //}
  226. //
  227. //object FeatureExtractionFromLogThirdPart extends FeatureExtractionFromLog{
  228. // def filter = "user_id >= 1020000000"
  229. //}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement