Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- %spark
- import java.net.URI
- import odkl.analysis.job.kafka.MessageWritable
- import org.apache.hadoop.fs.{FileSystem, Path}
- import org.apache.hadoop.io.NullWritable
- import org.apache.spark.sql.types.StructType
- import java.util.Date
- import java.util.Calendar
- import java.text.SimpleDateFormat
- import org.apache.spark.sql.SaveMode
- import odkl.analysis.spark.util.DateRange
- // Формат даты, которая используется в названиях паркетов -- оттуда же она читается в колонку date во всех датасетах
- val parquetDateFormat = new SimpleDateFormat("yyyy-MM-dd")
- val sequenceDateFormat = new SimpleDateFormat("yyyyMMdd_00_15")
- val okLiteActivityData = sqlContext.read.parquet("/tmp/trunin/okLiteActivity")
- val okLiteUsers = okLiteActivityData
- .select("userId")
- .distinct
- .map(r => r(0))
- .collect
- .toSeq
- val fs = FileSystem.get(new URI("hdfs://datalab-hadoop-backup"), sc.hadoopConfiguration)
- def processApiStats(date: Date) = {
- val parquetDate = Calendar.getInstance
- parquetDate.setTime(date)
- parquetDate.add(Calendar.DAY_OF_MONTH, -1)
- val parquetDateStr = parquetDateFormat.format(parquetDate.getTime)
- val apiStatPath = "/kafka/published/" + sequenceDateFormat.format(date) + "/apiStat"
- val firstFile = fs.listStatus(new Path(apiStatPath)).map(v => v.getPath.toString).find(v => v.contains("part")).get
- val data1 = sc.sequenceFile(firstFile,
- classOf[NullWritable],
- classOf[MessageWritable])
- .map(k => k._2.toString)
- .filter(k => k.contains("stream_first_page") || k.contains("photo_downloaded"))
- val data2 = sqlContext.read.json(data1)
- val dataSchema = data2.schema
- val fullRDD = sc.sequenceFile("hdfs://datalab-hadoop-backup" + apiStatPath,
- classOf[NullWritable],
- classOf[MessageWritable])
- .map(k => k._2.toString)
- .filter(k => k.contains("stream_first_page") || k.contains("photo_downloaded"))
- val fullData = sqlContext.read.schema(dataSchema).json(fullRDD)
- val filteredData = fullData
- .filter("userId IN (" + okLiteUsers.mkString(",") + ")")
- .filter($"applicationId" === 25662464 || $"applicationId" === 1262608128)
- .select(col("applicationId"), col("data.items"), col("userId"))
- .withColumn("item", explode($"items"))
- .select(col("userId").as("logUserId"),
- col("item.userId").as("itemUserId"),
- col("applicationId"),
- col("item.operation"),
- col("item.time"),
- col("item.network"),
- col("item.data"))
- .filter("(operation LIKE 'stream_first_page%') OR (operation = 'photo_downloaded')")
- .withColumn("userId", when(col("itemUserId").isNotNull, $"itemUserId").otherwise($"logUserId"))
- .filter(col("userId").isNotNull)
- .select(col("userId"), col("applicationId"), col("operation"),col("time"), col("network"), col("data").getItem(0).as("data0"), col("data").getItem(1).as("data1"))
- val basePath = "/tmp/trunin/apiStatsPublished_19apr"
- val parquetPath = basePath + "/date=" + parquetDateStr
- val photoParquetPath = basePath + "/photoDownload/date=" + parquetDateStr
- val csvPath = basePath + ".csv/date=" + parquetDateStr
- val photoCsvPath = basePath + ".csv/photoDownload.csv/date=" + parquetDateStr
- val streamFirstPageParquetPath = basePath + "/streamFirstPage/date=" + parquetDateStr
- val streamFirstPageCsvPath = basePath + ".csv/streamFirstPage.csv/date=" + parquetDateStr
- filteredData
- .repartition(10)
- .write
- .mode(SaveMode.Overwrite)
- .parquet(parquetPath)
- /*
- sqlContext.read.parquet(parquetPath)
- .repartition(1)
- .write
- .mode(SaveMode.Overwrite)
- .format("com.databricks.spark.csv")
- .option("header", "true")
- .save(csvPath)
- */
- val photoDownloadNetworkStats = sqlContext.read.parquet(parquetPath)
- .filter($"operation" === "photo_downloaded")
- .groupBy($"userId")
- .pivot("network", Seq("poor", "moderate", "good", "excellent"))
- .count
- val photoDownloadOutcomeStats = sqlContext.read.parquet(parquetPath)
- .filter($"operation" === "photo_downloaded")
- .groupBy($"userId")
- .pivot("data1", Seq("success", "failure"))
- .count
- val photoDownloadStats = photoDownloadNetworkStats
- .join(photoDownloadOutcomeStats, Seq("userId"), "outer")
- photoDownloadStats
- .repartition(10)
- .write
- .mode(SaveMode.Overwrite)
- .parquet(photoParquetPath)
- sqlContext.read.parquet(photoParquetPath)
- .repartition(1)
- .write
- .mode(SaveMode.Overwrite)
- .format("com.databricks.spark.csv")
- .option("header", "true")
- .save(photoCsvPath)
- val streamFirstPageData = sqlContext.read.parquet(parquetPath)
- .filter("operation LIKE 'stream_first_page%'")
- .select("userId", "applicationId", "operation", "time")
- streamFirstPageData
- .repartition(10)
- .write
- .mode(SaveMode.Overwrite)
- .parquet(streamFirstPageParquetPath)
- sqlContext.read.parquet(streamFirstPageParquetPath)
- .repartition(1)
- .write
- .mode(SaveMode.Overwrite)
- .format("com.databricks.spark.csv")
- .option("header", "true")
- .save(streamFirstPageCsvPath)
- }
- for (d <- DateRange("2018-02-01:2018-04-19").iterator) {
- println("Processing date: " + d)
- processApiStats(d.toDate)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement