Advertisement
Guest User

zzzzz

a guest
Jan 17th, 2018
84
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.39 KB | None | 0 0
  1. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  2. val df = sqlContext.read.parquet("/user/zhinkin/sessions_parquet/2017-12-12.parquet/sessions-r-*.gz.parquet")
  3. val resDF = df.filter(($"_vid".isNotNull) && ($"location".contains(".perekrestok.ru"))).select("_vid", "depth", "duration_ms", "bounce", "utm_source")
  4. val seqDF = sc.sequenceFile[String, String]("/user/reporter/seg-idx/2017-12-12.seq").map { case (seg_val, vid_InSegCnt) => {
  5.     val Array(seg, value) = seg_val.split("\t")
  6.     val Array(_vid, inSegCount) = vid_InSegCnt.split("\t")
  7.     // (seg.toString, value.toString, _vid.toString, inSegCount.toString)
  8.     (seg, value, _vid, inSegCount)
  9.     }
  10. }.toDF("seg", "value", "_vid", "inSegCount")
  11.  
  12. val joinedSessionDF = seqDF.join(resDF, Seq("_vid"), "inner").filter(($"depth".isNotNull) && ($"duration_ms".isNotNull))//.toDF("seg", "value", "_vid", "inSegCount", "depth", "duration_ms", "bounce", "utm_source")
  13.  
  14. val idxStratDF = sc.sequenceFile[String, String]("/user/reporter/idx/2017-12-12.seq").map { case (event_strat, _vid_times) => {
  15.     val Array(event, strat) = event_strat.split("#")
  16.     val Array(_vid, times) = _vid_times.split("\t")
  17.     (_vid, strat)
  18.     }
  19. }
  20. .filter{
  21.     case (vid, strat) => (strat != "12264")
  22. }.map(_._1).filter(_.nonEmpty).toDF("_vid")
  23.  
  24. val joinedSessionNoRetDF = joinedSessionDF.join(idxStratDF, Seq("_vid"), "inner")
  25.  
  26. joinedSessionNoRetDF.show(10)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement