Advertisement
Guest User

Untitled

a guest
Aug 18th, 2017
97
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.32 KB | None | 0 0
  1. package com.chamberlain.bda.util
  2. /**
  3. * @author Anagha Khanolkar [Microsoft]
  4. */
  5.  
  6.  
  7. import org.apache.spark.sql.SparkSession
  8. import org.apache.hadoop.fs.{ FileSystem, Path }
  9. import org.apache.hadoop.conf.Configuration
  10. import org.apache.spark.sql._
  11. import com.databricks.spark.avro._
  12. import org.apache.spark._
  13. import java.net.URI
  14.  
  15.  
  16. object CompactParsedLogs {
  17. //Main function
  18. def main(args: Array[String]): Unit = {
  19.  
  20. //Check for correct number of command-line arguments
  21. if (args.length != 6) {
  22. println("Please provide 6 parameters: <inputDirectory> <outputDirectory> <minFilesToCompact> <minCompactedFileSizeInMB> <oozieWorkflowID> <logLevel>")
  23. System.exit(1)
  24. }
  25.  
  26. //Capture command-line arguments
  27. val inputDirectory = args(0) //Logs to be compacted
  28. val outputDirectory = args(1) //Compacted logs
  29. val minFilesToCompact = args(2).toInt // Minimum number of files to execute compaction
  30. val minCompactedFileSizeInMB = args(3).toInt //E.g. 128 will compact into 128 MB file sizes
  31. val oozieWorkflowID = args(4) //So we can correlate batch with workflow when we log status
  32. val logLevel = args(5)
  33.  
  34. //Spark Session
  35. val sparkSession: SparkSession = SparkSession.builder().master("yarn").getOrCreate()
  36.  
  37. //Recursive glob support
  38. import sparkSession.implicits._
  39. sparkSession.sparkContext.hadoopConfiguration.setBoolean("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", true)
  40.  
  41. //Loglevel
  42. sparkSession.sparkContext.setLogLevel(logLevel)
  43.  
  44. //FileStatus - we will use this for scoping directories to target for compaction and subsequent deletion
  45. val fs = FileSystem.get(new Configuration())
  46. val inputDirFileStatusArr = fs.listStatus(new Path(inputDirectory))
  47.  
  48.  
  49. //Proceed with compaction only if we have enough files to compact
  50. if(inputDirFileStatusArr.length > minFilesToCompact) //Ensure we have a decent amount of files to compact
  51. {
  52. var errorString = ""
  53.  
  54. //Delete output directory if exists
  55. val dstFileSystem = FileSystem.get(new URI(outputDirectory + "/" + oozieWorkflowID), sparkSession.sparkContext.hadoopConfiguration)
  56. dstFileSystem.delete(new Path(outputDirectory + "/" + oozieWorkflowID), true)
  57.  
  58. try{
  59. //(1) Get input size to determine how many output files to coalesce to
  60. val inputDirSize = fs.getContentSummary(new Path(inputDirectory)).getLength
  61. var outputFileCount= Math.floor(inputDirSize/(minCompactedFileSizeInMB * 1024 * 1024)).toInt
  62. if (outputFileCount==0) outputFileCount = 1
  63.  
  64. //(2) Read input, coalesce, persist to output directory with Oozie workflow ID as directory name
  65. sparkSession.read.parquet(inputDirectory).coalesce(outputFileCount).write.parquet(outputDirectory + "/" + oozieWorkflowID)
  66.  
  67. //(3) No errors, so delete raw logs in input directory by iterating over FileStatus array
  68. inputDirFileStatusArr.foreach(x => {
  69. val dirToDelete: Path = x.getPath
  70. fs.delete(dirToDelete, true)
  71. })
  72.  
  73. } catch {
  74. case e: Exception =>
  75. errorString = e.getStackTrace.toString()
  76. } finally{
  77. //TODO: Log somewhere - workflowID | Start timestamp | End timestamp | Status | errorString | Directories compacted count | Directory size | File count and so on
  78. }
  79. }
  80. }
  81. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement