Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.chamberlain.bda.util
- /**
- * @author Anagha Khanolkar [Microsoft]
- */
- import org.apache.spark.sql.SparkSession
- import org.apache.hadoop.fs.{ FileSystem, Path }
- import org.apache.hadoop.conf.Configuration
- import org.apache.spark.sql._
- import com.databricks.spark.avro._
- import org.apache.spark._
- import java.net.URI
- object CompactParsedLogs {
- //Main function
- def main(args: Array[String]): Unit = {
- //Check for correct number of command-line arguments
- if (args.length != 6) {
- println("Please provide 6 parameters: <inputDirectory> <outputDirectory> <minFilesToCompact> <minCompactedFileSizeInMB> <oozieWorkflowID> <logLevel>")
- System.exit(1)
- }
- //Capture command-line arguments
- val inputDirectory = args(0) //Logs to be compacted
- val outputDirectory = args(1) //Compacted logs
- val minFilesToCompact = args(2).toInt // Minimum number of files to execute compaction
- val minCompactedFileSizeInMB = args(3).toInt //E.g. 128 will compact into 128 MB file sizes
- val oozieWorkflowID = args(4) //So we can correlate batch with workflow when we log status
- val logLevel = args(5)
- //Spark Session
- val sparkSession: SparkSession = SparkSession.builder().master("yarn").getOrCreate()
- //Recursive glob support
- import sparkSession.implicits._
- sparkSession.sparkContext.hadoopConfiguration.setBoolean("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", true)
- //Loglevel
- sparkSession.sparkContext.setLogLevel(logLevel)
- //FileStatus - we will use this for scoping directories to target for compaction and subsequent deletion
- val fs = FileSystem.get(new Configuration())
- val inputDirFileStatusArr = fs.listStatus(new Path(inputDirectory))
- //Proceed with compaction only if we have enough files to compact
- if(inputDirFileStatusArr.length > minFilesToCompact) //Ensure we have a decent amount of files to compact
- {
- var errorString = ""
- //Delete output directory if exists
- val dstFileSystem = FileSystem.get(new URI(outputDirectory + "/" + oozieWorkflowID), sparkSession.sparkContext.hadoopConfiguration)
- dstFileSystem.delete(new Path(outputDirectory + "/" + oozieWorkflowID), true)
- try{
- //(1) Get input size to determine how many output files to coalesce to
- val inputDirSize = fs.getContentSummary(new Path(inputDirectory)).getLength
- var outputFileCount= Math.floor(inputDirSize/(minCompactedFileSizeInMB * 1024 * 1024)).toInt
- if (outputFileCount==0) outputFileCount = 1
- //(2) Read input, coalesce, persist to output directory with Oozie workflow ID as directory name
- sparkSession.read.parquet(inputDirectory).coalesce(outputFileCount).write.parquet(outputDirectory + "/" + oozieWorkflowID)
- //(3) No errors, so delete raw logs in input directory by iterating over FileStatus array
- inputDirFileStatusArr.foreach(x => {
- val dirToDelete: Path = x.getPath
- fs.delete(dirToDelete, true)
- })
- } catch {
- case e: Exception =>
- errorString = e.getStackTrace.toString()
- } finally{
- //TODO: Log somewhere - workflowID | Start timestamp | End timestamp | Status | errorString | Directories compacted count | Directory size | File count and so on
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement