Advertisement
Guest User

Untitled

a guest
Apr 23rd, 2018
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 5.84 KB | None | 0 0
  1. %spark
  2.  
  3. import java.net.URI
  4. import odkl.analysis.job.kafka.MessageWritable
  5. import org.apache.hadoop.fs.{FileSystem, Path}
  6. import org.apache.hadoop.io.NullWritable
  7. import org.apache.spark.sql.types.StructType
  8. import java.util.Date
  9. import java.util.Calendar
  10. import java.text.SimpleDateFormat
  11. import org.apache.spark.sql.SaveMode
  12. import odkl.analysis.spark.util.DateRange
  13.  
  14. // Формат даты, которая используется в названиях паркетов -- оттуда же она читается в колонку date во всех датасетах
  15. val parquetDateFormat = new SimpleDateFormat("yyyy-MM-dd")
  16. val sequenceDateFormat = new SimpleDateFormat("yyyyMMdd_00_15")
  17.  
  18. val okLiteActivityData = sqlContext.read.parquet("/tmp/trunin/okLiteActivity")
  19.  
  20. val okLiteUsers = okLiteActivityData
  21.     .select("userId")
  22.     .distinct
  23.     .map(r => r(0))
  24.     .collect
  25.     .toSeq
  26.  
  27. val fs = FileSystem.get(new URI("hdfs://datalab-hadoop-backup"), sc.hadoopConfiguration)
  28.  
  29. def processApiStats(date: Date) = {
  30.     val parquetDate = Calendar.getInstance
  31.     parquetDate.setTime(date)
  32.     parquetDate.add(Calendar.DAY_OF_MONTH, -1)
  33.     val parquetDateStr = parquetDateFormat.format(parquetDate.getTime)
  34.  
  35.     val apiStatPath = "/kafka/published/" + sequenceDateFormat.format(date) + "/apiStat"
  36.    
  37.     val firstFile = fs.listStatus(new Path(apiStatPath)).map(v => v.getPath.toString).find(v => v.contains("part")).get
  38.    
  39.     val data1 = sc.sequenceFile(firstFile,
  40.                                 classOf[NullWritable],
  41.                                 classOf[MessageWritable])
  42.         .map(k => k._2.toString)
  43.         .filter(k => k.contains("stream_first_page") || k.contains("photo_downloaded"))
  44.        
  45.     val data2 = sqlContext.read.json(data1)
  46.     val dataSchema = data2.schema
  47.    
  48.     val fullRDD = sc.sequenceFile("hdfs://datalab-hadoop-backup" + apiStatPath,
  49.                                   classOf[NullWritable],
  50.                                   classOf[MessageWritable])
  51.             .map(k => k._2.toString)
  52.             .filter(k => k.contains("stream_first_page") || k.contains("photo_downloaded"))
  53.            
  54.     val fullData = sqlContext.read.schema(dataSchema).json(fullRDD)
  55.      
  56.     val filteredData = fullData
  57.         .filter("userId IN (" + okLiteUsers.mkString(",") + ")")
  58.         .filter($"applicationId" === 25662464 || $"applicationId" === 1262608128)
  59.         .select(col("applicationId"), col("data.items"), col("userId"))
  60.         .withColumn("item", explode($"items"))
  61.         .select(col("userId").as("logUserId"),
  62.                 col("item.userId").as("itemUserId"),
  63.                 col("applicationId"),
  64.                 col("item.operation"),
  65.                 col("item.time"),
  66.                 col("item.network"),
  67.                 col("item.data"))
  68.         .filter("(operation LIKE 'stream_first_page%') OR (operation = 'photo_downloaded')")
  69.         .withColumn("userId", when(col("itemUserId").isNotNull, $"itemUserId").otherwise($"logUserId"))
  70.         .filter(col("userId").isNotNull)
  71.         .select(col("userId"), col("applicationId"), col("operation"),col("time"), col("network"), col("data").getItem(0).as("data0"), col("data").getItem(1).as("data1"))
  72.    
  73.     val basePath = "/tmp/trunin/apiStatsPublished_19apr"
  74.    
  75.     val parquetPath = basePath + "/date=" + parquetDateStr
  76.     val photoParquetPath = basePath + "/photoDownload/date=" + parquetDateStr
  77.     val csvPath = basePath + ".csv/date=" + parquetDateStr
  78.     val photoCsvPath = basePath + ".csv/photoDownload.csv/date=" + parquetDateStr
  79.     val streamFirstPageParquetPath = basePath + "/streamFirstPage/date=" + parquetDateStr
  80.     val streamFirstPageCsvPath = basePath + ".csv/streamFirstPage.csv/date=" + parquetDateStr
  81.    
  82.     filteredData
  83.         .repartition(10)
  84.         .write
  85.         .mode(SaveMode.Overwrite)
  86.         .parquet(parquetPath)
  87.    
  88.     /*    
  89.     sqlContext.read.parquet(parquetPath)
  90.         .repartition(1)
  91.         .write
  92.         .mode(SaveMode.Overwrite)
  93.         .format("com.databricks.spark.csv")
  94.         .option("header", "true")
  95.         .save(csvPath)    
  96.     */
  97.    
  98.     val photoDownloadNetworkStats = sqlContext.read.parquet(parquetPath)
  99.         .filter($"operation" === "photo_downloaded")
  100.         .groupBy($"userId")
  101.         .pivot("network", Seq("poor", "moderate", "good", "excellent"))
  102.         .count
  103.        
  104.     val photoDownloadOutcomeStats = sqlContext.read.parquet(parquetPath)
  105.         .filter($"operation" === "photo_downloaded")
  106.         .groupBy($"userId")
  107.         .pivot("data1", Seq("success", "failure"))
  108.         .count
  109.        
  110.     val photoDownloadStats = photoDownloadNetworkStats
  111.         .join(photoDownloadOutcomeStats, Seq("userId"), "outer")
  112.        
  113.        
  114.     photoDownloadStats
  115.         .repartition(10)
  116.         .write
  117.         .mode(SaveMode.Overwrite)
  118.         .parquet(photoParquetPath)
  119.    
  120.     sqlContext.read.parquet(photoParquetPath)
  121.         .repartition(1)
  122.         .write
  123.         .mode(SaveMode.Overwrite)
  124.         .format("com.databricks.spark.csv")
  125.         .option("header", "true")
  126.         .save(photoCsvPath)    
  127.    
  128.     val streamFirstPageData = sqlContext.read.parquet(parquetPath)
  129.         .filter("operation LIKE 'stream_first_page%'")
  130.         .select("userId", "applicationId", "operation", "time")
  131.        
  132.     streamFirstPageData
  133.         .repartition(10)
  134.         .write
  135.         .mode(SaveMode.Overwrite)
  136.         .parquet(streamFirstPageParquetPath)
  137.        
  138.     sqlContext.read.parquet(streamFirstPageParquetPath)
  139.         .repartition(1)
  140.         .write
  141.         .mode(SaveMode.Overwrite)
  142.         .format("com.databricks.spark.csv")
  143.         .option("header", "true")
  144.         .save(streamFirstPageCsvPath)
  145. }
  146.  
  147.  
  148. for (d <- DateRange("2018-02-01:2018-04-19").iterator) {
  149.     println("Processing date: " + d)
  150.     processApiStats(d.toDate)
  151. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement