Advertisement
Guest User

Untitled

a guest
Jul 21st, 2016
90
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 18.92 KB | None | 0 0
  1. package org.sec.mvingAvg
  2.  
  3. //Jai Vekeria
  4.  
  5. //Import statements for the required libraries
  6. import scala.collection.mutable.{ Map => MMap }
  7. import scala.math._
  8. import scala.collection.mutable.ArrayBuffer
  9. import scala.collection.mutable.HashMap
  10. import kafka.serializer.StringDecoder
  11. import kafka.utils.Time
  12. import java.util.Date
  13. import java.text.SimpleDateFormat
  14. import java.text.DateFormat
  15. import java.util.Calendar
  16. import java.io._
  17. import org.apache.spark.SparkConf
  18. import org.apache.spark.streaming._
  19. import org.apache.spark.streaming.{ Minutes, Seconds, StreamingContext }
  20. import org.apache.spark.streaming.dstream.DStream
  21. import org.apache.spark.streaming.kafka._
  22. import org.apache.spark.SparkContext
  23. import org.apache.commons.math3.exception.MathIllegalArgumentException
  24. import org.apache.commons.math3.exception.util.LocalizedFormats
  25. import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
  26. import org.apache.spark.mllib.linalg.Vectors
  27. import org.apache.log4j.LogManager
  28. import org.slf4j.Logger
  29. import org.apache.log4j.Level
  30. import com.cloudera.sparkts.models._
  31. import com.cloudera.sparkts.TimeSeriesRDD
  32. import com.fasterxml.jackson.databind.ObjectMapper
  33. import com.fasterxml.jackson.module.scala.DefaultScalaModule
  34. import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
  35. import breeze.linalg.{ SparseVector, DenseMatrix, DenseVector }
  36.  
  37. /*
  38.  * This program accepts 7 arguments -> brokers, topics, sw, lw, sigma, maxSize, features
  39.  * brokers - Kafka broker
  40.  * topics - kafka topic
  41.  * sw - sliding window
  42.  * lw = window duration
  43.  * sigma = sigma for bollinger band
  44.  * maxSize = minimum amount of data required to make prediction
  45.  * features = list of features, should be the feature followed by the minimum band ie. tcp_syn-100
  46.  */
  47.  
  48. object multFeat {
  49.   def main(args: Array[String]) {
  50.     // check command line arguments
  51.     if (args.length < 2) {
  52.       Static.logger.error(s"""
  53.        |Usage: DirectKafkaWordCount <brokers> <topics>
  54.        |  <brokers> is a list of one or more Kafka brokers
  55.        |  <topics> is a list of one or more kafka topics to consume from
  56.        |
  57.        """.stripMargin)
  58.       System.exit(1)
  59.     }
  60.  
  61.     /*
  62.      * Here the vector is ts is being passed through the spark time series library which is autofitting the model based on
  63.      * configurable parameters, the array 'arr' is not actually used, it was part of an older piece of code
  64.      */
  65.     def modelPredictionIO(ts: org.apache.spark.mllib.linalg.Vector, arr: Array[Double], modelValue: Int): Array[Double] = {
  66.       val nullArry:Array[Double] = null
  67.       try {
  68.         Static.logger.info("Entering model")
  69.         val arimaModel = ARIMA.autoFit(ts, modelValue, modelValue, modelValue)
  70.         Static.logger.info("Finished Arima Autofitting")
  71.         val forecast = arimaModel.forecast(ts, 1)
  72.         Static.logger.info("Forecast of next window: " + forecast.toArray.mkString(","))
  73.         return forecast.toArray.toArray
  74.       } catch {
  75.         case e: org.apache.commons.math3.linear.SingularMatrixException => Static.logger.warn("Singular matrix error")
  76.         case e: org.apache.commons.math3.exception.TooManyEvaluationsException => Static.logger.warn("Too many calculations")
  77.         case e: java.lang.IllegalArgumentException => Static.logger.warn("Illegal start of expression")
  78.         case e: java.lang.Exception =>
  79.           Static.logger.warn("Stationary not achieved")
  80.           return nullArry
  81.       }
  82.       return nullArry
  83.     }
  84.  
  85.     /* Bollinger Band Class
  86.      * This is the Bolinger band class which defines the object Bollinger Band, here are three methods
  87.      * The BollBandStd method basically takes in the actual values extracted from the data stream and the predicted value
  88.      * and determines the standard deviation. The calcHighBand and calcLowBand take in the both the predicted values and the
  89.      * actual values and then call the calcavg function to determin sigma and then use a configurable parameter to to determin the
  90.      * bollinger band by adding or subtractig sigma respectivley
  91.      */
  92.     var sigmaValue: Double = 2
  93.     class BollingerBand() {
  94.       private def calcavg(svgBuffer: Double, origVals: Array[Double]): Double = {
  95.         var BollBandStd = new ArrayBuffer[Double]
  96.         val count = origVals.size
  97.         val mean = origVals.sum / count
  98.         val devs = origVals.map(score => (score - mean) * (score - mean))
  99.         val stddev = Math.sqrt(devs.sum / count)
  100.         return stddev
  101.       }
  102.  
  103.       //Calculated the high band
  104.       def calcHighBand(svg: Double, arr: Array[Double]): Double = {
  105.         var BollBandStd = calcavg(svg, arr)
  106.         val HighBollingerBand = svg + sigmaValue * BollBandStd
  107.         Static.logger.info("HighBollingerBand: " + HighBollingerBand)
  108.         return HighBollingerBand
  109.       }
  110.  
  111.       //Calculate the low band
  112.       def calcLowBand(svg: Double, arr: Array[Double]): Double = {
  113.         var BollBandStd = calcavg(svg, arr)
  114.         val LowBollingerBand = svg - sigmaValue * BollBandStd
  115.         Static.logger.info("LowBollingerBand: " + LowBollingerBand)
  116.         return LowBollingerBand
  117.       }
  118.     }
  119.  
  120.     //Initilization of variables and kafkastream
  121.     /*
  122.      * Essentially here, the kafka stream is initialized and the command line arguments are processed intro thier respected arrays and
  123.      * variables
  124.      */
  125.     Static.logger.info("Command Line Arguments:")
  126.     args.foreach { x => Static.logger.info(x) }
  127.     val Array(brokers, topics, sw, lw, sigma, maxSize, features) = args
  128.     Static.logger.info("Arguments " + args.mkString(","))
  129.     sigmaValue = sigma.toDouble
  130.     Static.logger.info("Sigma " + sigmaValue)
  131.  
  132.     val maxThreshold = new RandomAccessFile(new File("minThresholds.txt"), "rw")
  133.  
  134.     /*
  135.      * Separating the features from the minimum thresholds inputed in the commands line
  136.      * then place the features in the newFeatArray an place the thresholds into a txt file
  137.      * minThreshold.txt
  138.      */
  139.     Static.logger.info("Features: " + features)
  140.     val multFeatArray = features.split(",")
  141.     val newFeatArray = new ArrayBuffer[String]
  142.     for (i <- 0 until multFeatArray.size) {
  143.       var tempFeatureArray = multFeatArray(i).split("-")
  144.       maxThreshold.skipBytes(maxThreshold.length().toInt)
  145.       newFeatArray.+=(tempFeatureArray(0))
  146.       maxThreshold.writeChars(tempFeatureArray(0) + "," + tempFeatureArray(1) + "\n")
  147.     }
  148.     maxThreshold.close()
  149.     /*
  150.      * Declaring spark context and kafka stream
  151.      */
  152.     val sparkConf = new SparkConf().setAppName("MovingAverageStream").setMaster("local[4]")
  153.     val ssc = new StreamingContext(sparkConf, Seconds(10))
  154.     val topicsSet = topics.split(",").toSet
  155.     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
  156.     val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  157.       ssc, kafkaParams, topicsSet).map(_._2)
  158.  
  159.     Static.logger.info("This is newFeatArray " + newFeatArray)
  160.     newFeatArray.foreach(x => print("This is an element of the new feature array " + x + "\n"))
  161.  
  162.     /*
  163.      * Converting the raw JSON netflows into map format
  164.      */
  165.     val fil = messages.map { x =>
  166.       {
  167.         val jsonMapper = new ObjectMapper
  168.         jsonMapper.registerModule(DefaultScalaModule)
  169.         lazy val data = {
  170.           try {
  171.             jsonMapper.readValue(x, classOf[Map[String, Any]])
  172.           } catch {
  173.             case e: Exception => { Static.logger.error("\n\nError while processing JSON : Error : " + e + ", Json : " + x); null }
  174.           }
  175.         }
  176.         val in_flag = System.currentTimeMillis()
  177.         data + ("event_received_time" -> in_flag)
  178.       }
  179.     }
  180.  
  181.     //Merges the the netflow map in the larger map to the rest of the map so there are not nested maps in the processing stream, basically just one large un-nested map
  182.     val flatenedMapStream = fil.map(x => {
  183.       var netflowMapOption = x.get("netflow").map { case m2: Map[String, _] => m2 }
  184.       var netflowMap = netflowMapOption.get
  185.       var myMutableMap = collection.mutable.Map() ++ x
  186.       var removedNetflowMap = myMutableMap.-("netflow")
  187.       var flattenedCombinedMap = netflowMap ++ removedNetflowMap
  188.       //println("This is the removedNetflowMap " + removedNetflowMap)
  189.       flattenedCombinedMap
  190.     })
  191.  
  192.     //Filter out IPs whose source address space is private
  193.     val privateFil = flatenedMapStream.filter(x => x.exists(_ == "src_addr_space" -> "private"))
  194.  
  195.     /*
  196.      * Extracting features from the map, ie tcp_syn, in_bytes etc. to from a new map only containing the features
  197.      * specified. A map is outputed at the end with the key as the ip and teh features and their values in another
  198.      * map as the value to the big map ie. HashMap[String, HashMap[String, String]]]
  199.      */
  200.     val nestedIpMap = privateFil.map(x => {
  201.       var organizedFeatureArray: HashMap[String, String] = HashMap()
  202.       newFeatArray.foreach(i => {
  203.         if (x.getOrElse(i, "0").toString().toDouble > 0) {
  204.           organizedFeatureArray.+=(i.toString() -> x.getOrElse(i, "0").toString())
  205.         } else
  206.           organizedFeatureArray.+=(i.toString() -> "0.0")
  207.       })
  208.       HashMap(x.getOrElse("ipv4_src_addr", "0.0.0.0.0").toString() -> organizedFeatureArray)
  209.     })
  210.  
  211.     /*Convert the data structure described above into tuple
  212.      *
  213.      */
  214.     val newIP = nestedIpMap.map(x => {
  215.       val xi = x.keySet.toArray
  216.       val value: HashMap[String, String] = x.getOrElse(xi(0), null)
  217.       Tuple2(xi(0), value)
  218.     })
  219.  
  220.     //Aggregate 10 seconds data and reduce to single map per ip
  221.     val newStream = newIP.reduceByKey((x, y) => {
  222.       var tempMap = HashMap[String, String]()
  223.       newFeatArray.foreach(i => {
  224.         if (x.contains(i) && y.contains(i))
  225.           tempMap.put(i, (x.getOrElse(i, "NaN").toDouble + y.getOrElse(i, "NaN").toDouble).toString())
  226.       })
  227.       tempMap
  228.     })
  229.  
  230.     //Printing out the values to the csv every 10 seconds
  231.     newStream.foreachRDD(rdd => rdd.foreach(map => {
  232.       map._2.keySet.foreach(key => {
  233.         var predictionFile = new RandomAccessFile(map._1 + "_" + key + "prediction" + ".csv", "rw")
  234.         var str = "N/A"
  235.         if (predictionFile.length() > 1) {
  236.           str = predictionFile.readLine().toString()
  237.           predictionFile.close()
  238.         }
  239.         val dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
  240.         val date = new Date();
  241.         val hshMap = map._2.getOrElse(key, null)
  242.         val writeCharsr = new RandomAccessFile(new File(map._1 + "_" + key + ".csv"), "rw")
  243.         writeCharsr.skipBytes(writeCharsr.length().toInt)
  244.         writeCharsr.writeChars(dateFormat.format(date) + "," + hshMap + "," + str + "\n")
  245.         writeCharsr.close()
  246.         predictionFile.close()
  247.       })
  248.     }))
  249.  
  250.     try {
  251.       /*
  252.        *This is the window declaration, here the sliding window duration is the represented by a command line parameter sw and the duration
  253.        * is also a command line parameter represented by lw, ex. if the duration is 15min and the sliding window is 1min then the window will compute
  254.        * every 1min with the last 15min of data
  255.        */
  256.       val win2 = newStream.window(Seconds(lw.toString().split(",")(0).toString().toLong), Seconds(sw.toString().split(",")(0).toString().toLong))
  257.  
  258.       //Reducing the duration of data into an csv for unique ip and feature for processing
  259.       val window2ProcessingStream = win2.reduceByKey((x, y) => {
  260.         var tempMap = HashMap[String, String]()
  261.         newFeatArray.foreach(i => {
  262.           if (x.contains(i) && y.contains(i)) {
  263.             var tempArray = new ArrayBuffer[String]
  264.             tempArray.+=(x.getOrElse(i, "NaN"))
  265.             tempArray.+=(y.getOrElse(i, "NaN"))
  266.             var valueArray = tempArray.toArray
  267.             var valueString = valueArray.mkString(",")
  268.             tempMap.put(i, valueString)
  269.           }
  270.         })
  271.         tempMap
  272.       })
  273.  
  274.       // pass the data through the prediction and band creation functions
  275.       val finalData = window2ProcessingStream.map(bigMapIn => {
  276.         //Initializing the required arrays used for the calculation
  277.         var movBuffer = new ArrayBuffer[Array[Double]]
  278.         var newHashArray = new ArrayBuffer[Map[String, Map[String, Array[Double]]]]
  279.         val MovingAverageHash = new ArrayBuffer[Map[String, Map[String, Array[Double]]]]
  280.         var movingAverageArray = new ArrayBuffer[Array[Double]]
  281.         var counter = 0
  282.         var featureArray = new ArrayBuffer[HashMap[String, Array[Double]]]
  283.         var ip = ""
  284.        
  285.         //Place all the values into an array and filter out all the zeros
  286.         try {
  287.           bigMapIn._2.keySet.toArray.foreach(i => {
  288.             var movAvgBuffer = new ArrayBuffer[Double]
  289.             bigMapIn._2.getOrElse(i, "").split(",").map(_.toDouble).foreach(z => movAvgBuffer.+=(z))
  290.             movAvgBuffer = movAvgBuffer.filter { x => x > 0 }
  291.             if (movAvgBuffer.size > 0) {
  292.               Static.logger.info("movAvgBuffer: " + movAvgBuffer)
  293.               movBuffer.+=(movAvgBuffer.toArray)
  294.               newHashArray.+=(Map(bigMapIn._1 -> Map(i -> movAvgBuffer.toArray)))
  295.             }
  296.           })
  297.         } catch {
  298.           case e: java.lang.IndexOutOfBoundsException => ("Feature not available")
  299.           case e: java.lang.ArrayIndexOutOfBoundsException => ("---")
  300.         }
  301.  
  302.         Static.logger.info("newHashArray: " + newHashArray)
  303.         Static.logger.info("movBuffer size: " + movBuffer.size)
  304.  
  305.         // create Moving Average Hash
  306.         newHashArray.foreach(u => {
  307.           var fmovAvgBuffer = new ArrayBuffer[Double]
  308.           u.values.foreach(k => {
  309.             k.values.foreach({ n =>
  310.               val temAvg = new ArrayBuffer[Double]
  311.               var avg: Double = 0
  312.               n.foreach(o => {
  313.                 temAvg.+=(o)
  314.                 avg = temAvg.sum / temAvg.size
  315.                 fmovAvgBuffer.+=(avg)
  316.               })
  317.             })
  318.  
  319.             movingAverageArray.+=(fmovAvgBuffer.toArray)
  320.             val nKeyArray = k.keySet.toArray
  321.             val nKey = nKeyArray(0)
  322.             var keyArr = u.keySet.toArray
  323.             var key = keyArr(0)
  324.             MovingAverageHash.+=(Map(key -> Map(nKey -> fmovAvgBuffer.toArray)))
  325.           })
  326.         })
  327.  
  328.         Static.logger.info("MovingAverageHash: " + MovingAverageHash + " Size: " + MovingAverageHash.size)
  329.         Static.logger.info("movingAverageArray: " + movingAverageArray.size)
  330.  
  331.         /*
  332.          * Here we are looping through each of the maps and extracting each array from each feature for each IP
  333.          * then vectorizing them and pumping them through the ARIMA model, after, these returned values are ran through
  334.          * the bollinger band class where the bands are calcluated
  335.          */
  336.         for (a <- 0 until MovingAverageHash.size) {
  337.           Static.logger.info("Current MovingAverageHash: " + MovingAverageHash(a))
  338.           MovingAverageHash(a).values.foreach(p => {
  339.             var sKeyArr = p.keySet.toArray
  340.             var sKey = sKeyArr(0)
  341.             Static.logger.info("Feature: " + sKey)
  342.             //Loop through each of the features
  343.             p.values.foreach(v => {
  344.               var AverageArray = movingAverageArray(a)
  345.               val key = MovingAverageHash(a).keySet.toArray
  346.               val keyVal = key(0)
  347.               ip = keyVal
  348.               println("This is the value of v: " + v.mkString(" "))
  349.               //Making sure that there is an appropriate amount of data, set as a command line parameter, in each array
  350.               if (v.size > maxSize.toDouble) {
  351.                 val vect = Vectors.dense(v.toArray)
  352.                 var predictionArray:Array[Double] = null
  353.                 val modelValue = 1
  354.                 Static.logger.info("Vector for model: " + vect)
  355.                 Static.logger.info("Mean " + (v.sum / v.size))
  356.                 //Make sure that the data is not static
  357.                 if ((v.sum / v.size) != v(0)) {
  358.                   predictionArray = modelPredictionIO(vect, movBuffer(a), modelValue)
  359.                 }
  360.                 println("Doing stuff now")
  361.                 val actualValArr = movBuffer(a)
  362.                 counter = counter + 1
  363.                 //Here we check that that the a prediction vetor has been created and then proceed with the bollinger band calculation
  364.                 if (predictionArray != null) {        
  365.                   println("This is the prediction array: " + predictionArray.mkString(" "))
  366.                   var boll = new BollingerBand()
  367.                   var highband = boll.calcHighBand((predictionArray.sum / predictionArray.size), actualValArr)
  368.                   var lowband = boll.calcLowBand((predictionArray.sum / predictionArray.size), actualValArr)
  369.                   var finalHashMap = new HashMap[String, Array[Double]]
  370.                   var exArray = new ArrayBuffer[Double]
  371.                   exArray.+=(predictionArray.sum / predictionArray.size)
  372.                   exArray.+=(highband)
  373.                   exArray.+=(lowband)
  374.                   finalHashMap.put(sKey, exArray.toArray)
  375.                   featureArray.+=(finalHashMap)
  376.                   Static.logger.info("Done printing to csv")
  377.                 }
  378.               }
  379.             })
  380.           })
  381.         }
  382.         Tuple2(ip, featureArray)
  383.       })
  384.  
  385.       //Combing both feature maps into one single map and into tuple
  386.       val editedFinalStream = finalData.map(x => {
  387.         var CombinedMap = new HashMap[String, Array[Double]]
  388.         for (i <- 0 until x._2.size) {
  389.           CombinedMap = x._2(i) ++ CombinedMap
  390.         }
  391.         (x._1, CombinedMap)
  392.       })
  393.  
  394.       //Updating a prediction csv every sliding window and checking the min threshold file to assure that the threshold is above a certain value
  395.       editedFinalStream.foreachRDD(x => x.foreach(y => {
  396.         y._2.keySet.foreach(map => {
  397.           /*val maxThreshold = new RandomAccessFile(new File("minThresholds.txt"), "rw")
  398.           var maxThresholdBuffer = new ArrayBuffer[String]
  399.           maxThreshold.seek(0)
  400.           for(i <- 1 until maxThreshold.length.toInt)
  401.             maxThresholdBuffer.+=(maxThreshold.readLine())        
  402.           println("This is the contents of the maxThresholdBuffer: " + maxThresholdBuffer)
  403.           println("This is the content of first element of Buffer: " + maxThresholdBuffer(0).toString())          
  404.           */
  405.           var arr = y._2.getOrElse(map, null)
  406.           Static.logger.info("Printing to csv")
  407.           val writeCharsr = new RandomAccessFile(new File(y._1 + "_" + map + "prediction" + ".csv"), "rw")
  408.           writeCharsr.seek(0)
  409.           writeCharsr.writeChars(arr(0) + "," + arr(1) + "," + arr(2))
  410.           writeCharsr.close()
  411.         })
  412.       }))
  413.     } catch {
  414.       case e: java.io.NotSerializableException => Static.logger.warn("Probably no hits in the netflow enough in window")
  415.       case e: java.lang.IndexOutOfBoundsException => Static.logger.warn("Not enough data for processing")
  416.     }
  417.     ssc.start()
  418.     ssc.awaitTermination()
  419.   }
  420.  
  421. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement